Source code for schrodinger.application.desmond.config

"""
Utilities for handling Desmond config files.

Copyright Schrodinger, LLC. All rights reserved.
"""

# Contributors: Yujie Wu

import collections
import warnings
from copy import deepcopy
from past.utils import old_div

import schrodinger.application.desmond.cmj as cmj
import schrodinger.application.desmond.cms as cms
import schrodinger.application.desmond.fep_schedule as fep_schedule
import schrodinger.utils.sea as sea
from schrodinger.application.desmond import constants
from schrodinger.infra import mm

FEP_LAMBDA_SCHEMES = [
    "default", "flexible", "charge", "quickcharge", "superquickcharge"
]

DEFAULT_CONFIG = """{
Desmond = {
   config_version = 3
}

app       = "mdsim"
boot.file = ""


global_cell = {
   reference_time       = 0.0
   topology             = periodic
   n_replica            = 1
   partition            = [1 1 1]
   r_clone              = 5.903366
   clone_policy         = rounded
   margin               = 1.0
  #est_pdens            = 0.1
  #est_n_atom_per_voxel = 1.0
}

force = {
   constraint = {
      tol   = 1e-08
      maxit = 8
   }
   nonbonded = {
      sigma  = 2.791856
      r_cut  = 9.0
      n_zone = 1024

      near = {
         type  = default               # default | force-only | table | alchemical:softcore | binding:softcore
         r_tap = 9.0
         taper = none                  # none | shift | c1switch | c2switch
        #average_dispersion = 69.5     # In kcal/mol A^6. If not set, it will be caculated by Desmond.
         correct_average_dispersion = true
      }

      far = {
         type     = pme                # none | pme | gse
         sigma_s  = 0.85
         r_spread = 4.0
         n_k      = [16 16 16]
         order    = [4 4 4]
      }
   }

   bonded  = {}
   virtual = {}

   ignore_com_dofs = false             # subtract 3 from the degrees of freedom.

   term = {
      list  = []
      gibbs = {
         type      =  none             # none | alchemical | binding
         alpha_vdw =  0.5
         window    =  -1               # window index
         weights   = {
            N     = 1                  # number of windows
            vdw   = []                 # binding
            es    = []                 # binding
            vdwA  = []                 # alchemical
            vdwB  = []                 # alchemical
            qA    = []                 # alchemical
            qB    = []                 # alchemical
            qC    = []                 # alchemical
            bondA = []                 # alchemical
            bondB = []                 # alchemical
         }
         output = {
            name     = "fep.dE"
            first    = 0.0
            interval = 0.1
         }
      }
   }
}

migration = {
   first    = 0.0
   interval = 0.012
}

integrator = {
   type        =  Multigrator   # Multigrator | Ber_NPT | Ber_NVT | MTK_NPT | NH_NVT | V_NVE | L_NVT | L_NPT | Piston_NPH | anneal
   dt          =  0.002
   temperature = {T_ref = 300.0}

   respa       = {
      near_timesteps  = 1
      far_timesteps   = 3
      outer_timesteps = 3
   }

   pressure = {
      isotropy               = isotropic    # anisotropic | constant_area | isotropic | semi_isotropic | flexible | monoclinic
      max_margin_contraction = 0.9
      P_ref                  = 1.01325
     #tension_ref            = 4E3
   }

   Multigrator = {
      thermostat = {                   # none or a map
         type      = NoseHoover        # Langevin | NoseHoover
         timesteps = 12
         Langevin  = {
            tau  = 0.016129            # Gamma = 1/tau = 62 is default -- collision rate of water.
            seed = 2012
         }
         NoseHoover = {
            mts  = 2
            tau  = [0.1 0.1 0.1]
         }
      }
      barostat = {                     # none or a map
         type      = MTK
         timesteps = 48
         MTK       = {
            T_ref      = 300.0
            tau        = 0.05
            thermostat = {
               type       = NoseHoover   # NoseHoover | Langevin
               NoseHoover = {
                  mts  = 2
                  tau  = [0.1 0.1 0.1]
               }
               Langevin  = {
                  tau  = 0.016129        # Gamma = 1/tau = 62 is default -- collision rate of water.
                  seed = 2012
               }
            }
         }
      }
      nve.type = Verlet   # none | Verlet | PLS
   }

   MTK_NPT = {
      barostat = {
         tau = 2.0
         thermostat = {
            mts =  2
            tau = [1.0 1.0 1.0 ]
         }
      }
      thermostat = {mts = 2 tau = [1.0 1.0 1.0 ]}
   }

   NH_NVT = {
      thermostat = {mts = 2 tau = [1.0 1.0 1.0 ]}
   }

   Ber_NPT = {
      tau                  = 1.0
      min_velocity_scaling = 0.85
      max_velocity_scaling = 1.2
      barostat = {
         tau                      =  2.0
         kappa                    =  4.5e-05
         min_contraction_per_step =  0.95
         max_expansion_per_step   =  1.1
      }
   }

   Ber_NVT = {
      tau                  = 1.0
      min_velocity_scaling = 0.85
      max_velocity_scaling = 1.2
   }

   L_NVT = {
      thermostat = {
         tau  = 0.016129      # Gamma = 1/tau = 62 is default -- collision rate or water.
         seed = 2007
      }
   }

   L_NPT = {
      thermostat = {
         tau  = 0.016129      # Gamma = 1/tau = 62 is default.
         seed = 2007
      }
      barostat = {
         tau   = 1.0
         T_ref = 300.0
         thermostat = {
            tau  = 0.016129
            seed = 2007
         }
      }
   }

   brownie_NVT = {
      thermostat.seed = 2014
      delta_max       = 0.1
   }

   brownie_NPT = {
      thermostat.seed = 2014
      delta_max       = 0.1
      barostat = {
         thermostat.seed = 2014
         tau   = 0.016129
         T_ref = 300.0
      }
   }

   Piston_NPH = {
      barostat = {
         tau   = 1.0
         T_ref = 300.0
      }
   }

   V_NVE = {}
}


PLUGIN_COMMON = {
   list = [status randomize_velocities remove_com_motion eneseq trajectory maeff_output simbox_output]
   #
   anneal = {
      type     = anneal
      first    = 0.0
      interval = 1.2
      schedule = {
         time  = [0.0 30.0  60.0  90.0  600.0]
         value = [0.0 300.0 600.0 900.0 300.0]
      }
   }
   #
   status = {
      type     = status
      first    = 0.0
      interval = 1.2
   }
   #
   eneseq = {
      type     = eneseq
      name     = "system.ene"
      first    =  0.0
      interval =  1.2
   }
   #
   trajectory = {
      type            = trajectory
      name            = "out_trj"
      write_velocity  = false
      first           = 0.0
      interval        = 4.8
      center          = []
      glue            = []
      mode            = noclobber     # append | noclobber | clobber
      periodicfix     = true
      write_last_step = true
      frames_per_file = 250
      write_last_vel  = false
   }
   #
   maeff_output = {
      type             = maeff_output
      first            = 0.0
      interval         = 120.0
      name             = ""
      write_last_step  = true
      periodicfix      = true
      precision        = 8
      full_system_only = false
      trjdir           = ""
      center_atoms     = ""
   }
   #
   maeff_snapshot = {
      type     = maeff_snapshot
      name     = ""
      first    = 0.0
      interval = 1.2
   }
   #
   randomize_velocities = {
      type        = randomize_velocities
      first       = 0.0
      interval    = inf
      seed        = 2007
      temperature = 300.0
   }
   #
   simbox_output = {
      type     = simbox_output
      name     = ""
      first    = 0.0
      interval = 1.2
   }
   #
   energy_groups = {
      type         = energy_groups
      name         = ""
      first        = 0.0
      interval     = 1.2
      options      = [corr_energy ]        # [pressure_tensor corr_energy self_energy]
      write_report = true
   }
   #
   remove_com_motion = {
      type     = remove_com_motion
      first    = 0.0
      interval = inf
   }
   #
   ptensor = {
      type         = ptensor
      name         = ""
      first        = 0.0
      interval     = 1.2
      print_volume = false
   }
   #
   dipole_moment = {
      type         = dipole_moment
      first        = 0.0
      interval     = 1.2
      name         = ""
   }
}


CHECKPT_COMMON = {
   name             = "checkpt.cpt"
   first            = 0.0
   interval         = 240.06
   wall_interval    = 7200.0
   write_first_step = false
   write_last_step  = true
}


mdsim = {
   title     = "Desmond MD simulation"
   last_time =  1200.0               # In the unit of ps
   checkpt   = "@*.CHECKPT_COMMON"
   plugin    = "@*.PLUGIN_COMMON"
}

reinit = {
   title     = "Desmond reinit/MD simulation"
   last_time = inf
   checkpt   = "@*.CHECKPT_COMMON"
   plugin    = "@*.PLUGIN_COMMON"
   frameset = {
       first    = 0.0
       interval = 1200.0
       configurations_file = ""
   }
}

remd = {
   title         = "Desmond REMD simulation"
   last_time     = 1200.0
   first         = 1.2
   interval      = 1.2
   exchange_type = neighbors     # neighbors | random
   seed          = 2008          # random number seed
   cfg           = []
   checkpt       = "@*.CHECKPT_COMMON"
   plugin        = "@*.PLUGIN_COMMON"
   deltaE        = {
      first    = "@*.first"
      interval = "@*.interval"
      name     = "deltaE.txt"
   }
}

vrun = {
   title    = "Desmond Simulation"
   input    = frameset               # bootfile | frameset (|stdin to be a feature added later)
   checkpt  = "@*.CHECKPT_COMMON"
   plugin   = "@*.PLUGIN_COMMON"
   last_time= inf
   frameset = {
      first     = 0.0
      interval  = 0.0
      name      = ""
      last_time = "@*.last_time"
   }
}

minimize = {
   m        = 3                 # Number of L-BFGS vectors to use
   maxsteps = 200               # Max number of iterations
   tol      = 1.0               # Converging criteria for gradient norm (kcal/mol)
   stepsize = 0.005             # Norm of first step
   switch   = 25.0              # Minimal gradient norm before switching to LBFGS (kcal/mol/Angstrom)
   sdsteps  = 10                # Min number of initial SD steps
   plugin   = {
      list = [maeff_output eneseq]
      #
      eneseq = {
         type   = eneseq
         name     = "system.ene"
         first    =  0.0
         interval =  1.2
      }
      #
      maeff_output = {
         type             = maeff_output
         first            = inf
         interval         = inf
         name             = ""
         write_last_step  = true
         periodicfix      = true
         precision        = 8
         full_system_only = false
         trjdir           = ""
         center_atoms     = ""
      }
      #
      maeff_snapshot = {
         type     = maeff_snapshot
         name     = ""
         first    = 0.0
         interval = 1.0
      }
      energy_groups = {
         type         = energy_groups
         name         = ""
         first        = 0.0
         interval     = 1.2
         options      = [corr_energy ]        # [pressure_tensor corr_energy self_energy]
         write_report = true
      }
   }
}

gui = {
   ewald_tol = 1E-9
}
}"""


[docs]def extract_cfg(cpt_fname, min_size=2 * 1048576, return_raw=False): """ Extracts the extended ark content from a checkpoint file. This function returns a `sea.Map` object or None if parsing fails. The ark as represented by the string contains the following values: - argv : the argument vector - boot_timestamp : the boot time for the program - chemical_time : chemical time in pico seconds - config : the ark file - threader_size : the number of threads - version : the desmond version - world_size : the number of processors """ import re r""" "\{ - look for opening bracket \s?\w{3,}\s=\s - followed by possibly a space, must have 3 or more alphanumeric or underscore characters, followed by ' = '. This prevents junk at the start such as the '{12' in '{12{ORIG_CFG' from being matched. [ -~\s]{300,} - followed by ' -~\s' 300 or more times. ' -~' means any printable character and \s is all whitespace. \}" """ CFG_PATTERN = re.compile(br"\{\s?\w{3,}\s=\s[ -~\s]{300,}\}") cfg = None try: with open(cpt_fname, 'rb') as f: # the desired information is within the min_size bytes buf = f.read(min_size) m = CFG_PATTERN.search(buf) while (m): tmp_cfg = m.group(0).decode() tmp_cfg_filtered = '' # Match { and } brackets = [] for c in tmp_cfg: if c == '{': brackets.append(c) if c == '}': brackets.pop() tmp_cfg_filtered += c if len(brackets) == 0: break tmp_cfg = tmp_cfg_filtered if (tmp_cfg.find("argv") > -1 and tmp_cfg.find("threader_size") > -1 and tmp_cfg.find("config") > -1): raw_cfg = sea.Map(tmp_cfg) if not return_raw and 'ORIG_CFG' in raw_cfg.config: cfg = raw_cfg.config.ORIG_CFG if 'chemical_time' in raw_cfg: cfg.elapsed_time.val = raw_cfg.chemical_time.val else: # old config format does not have 'ORIG_CFG' # just extract the whole config block cfg = raw_cfg.config break m = CFG_PATTERN.search(buf, m.end()) except IOError: pass return cfg
[docs]def has_plugin(desmond_exec, plugin_name): """ """ try: plugin_list = desmond_exec.plugin.list.val return (plugin_name in plugin_list) except AttributeError: pass return False
[docs]def add_plugin(desmond_exec, plugin_name, position=None): """ """ if (position is None): plugin_list = desmond_exec.plugin.list if (plugin_name in plugin_list.val): return plugin_list.append(plugin_name) else: remove_plugin(desmond_exec, plugin_name) desmond_exec.plugin.list.insert(position, plugin_name)
[docs]def remove_plugin(desmond_exec, plugin_name): """ """ plugin_list = desmond_exec.plugin.list to_be_removed = [] for i, p in enumerate(plugin_list): if (p.val == plugin_name): to_be_removed.append(i) to_be_removed.reverse() for i in to_be_removed: plugin_list.pop(i)
[docs]def get_homebox(box, cpu_top): """ """ x = cpu_top[0] y = cpu_top[1] z = cpu_top[2] a = [ old_div(box[0], x), old_div(box[3], y), old_div(box[6], z), ] b = [ old_div(box[1], x), old_div(box[4], y), old_div(box[7], z), ] c = [ old_div(box[2], x), old_div(box[5], y), old_div(box[8], z), ] return a + b + c
[docs]def is_triclinic_box(box): """ """ box_ = deepcopy(box) box_[0] = 0.0 box_[4] = 0.0 box_[8] = 0.0 for b in box_: if (b != 0.0): return True return False
# lambda exp( x/2.95) * 0.024-0.024 # 0 0 # 1 0.0096844694 # 2 0.023276811 # 3 0.04235393 # 4 0.069129038 # 5 0.10670843 # 6 0.15945183 # 7 0.23347823 # 8 0.33737574 # 9 0.48319791 # 10 0.68786219 # 11 0.9751125
[docs]def optimize_key(msj, cfg, model): """ Optimizes the simulation parameters in 'cfg', where 'cfg' must represent a complete config file. """ import math r_cut = cfg.force.nonbonded.r_cut.val cpu = cfg.global_cell.partition.val dt = cfg.integrator.dt.val far_ts = cfg.integrator.respa.far_timesteps.val far_dt = dt * far_ts out_dt = dt * cfg.integrator.respa.outer_timesteps.val epsilon = min(cfg.gui.ewald_tol.val, 0.1) # Below is the idea how to optimize the margin. -YW # 1. The "margin" parameter's value can be optimized as k * rmsm + C, where k is a prefactor, rmsm is the root mean square # motion, C is the maximum distance that a particle can travel without colliding to another one. Below, let's first # consider the root mean square motion, and then k and C. # 2. The root-mean-square motion (rmsm) can be calculated from the diffusion constant using the equation: # rmsm = sqrt( <x^2> - <x>^2 ) = sqrt( 6 * D * dt ) # where D is diffusion constant, and dt is the time interval. # 3. How to get dt and D? # - dt is easy. We just get its value from `cfg' since it is a settable parameter. # - D is a little bit troublesome. First of all, different molecules have different diffusion constants. In principle, # We should use the largest D, but unfortunately we don't rigorously know what the largest D is. Nevertheless, for # aqueous systems, it is probably safe to assume water is the most diffusive molecule. So we use water's D. # - A more difficult problem is that D is temperature-dependent. For water molecules, D = 0.0187 (T = 242.5 K), # 0.105 (T = 273.5 K), 0.223 (T = 298.0 K), 0.3575 (T = 318.0 K). The unit of D here is A^2/ps. So we need to somehow # predict D based on the temperature. For this, we can use the Frenkel-Eyring theory, which basically says # D = A * exp( -E / RT ) # where A and E presumably are constants (this assumption is an oversimplification, but should be good enough for our # purpose here). By fitting to the experimental data, we get A = 807.015 A^2/ps and E = 4.87 kcal/mol. # 4. For very high temperature, D is overestimated a lot by the Frenkel-Eyring theory. So we use the following empirical # formula to correct the result: D = D - 0.03 * (T - 350), for T > 350. # 5. How do we choose value for the prefactor k? # - First of all, commonly used water models such as TIP3P and SPC tend to overestimate water diffusion constant. So to # be safe, we should assume the D of water model is at least 2 times the experimental value. # - Second, remember that mean square displacement formula for D is correct only for time -> inf; for time -> 0, the # value would be much larger, presumedly ~100% larger. # - The above thoughts lead to a working value for diffusion constant = 4 D, where D is the experimental value. So this # gives k = sqrt( 4 ) = 2. # 6. How about C? # - For the value of C, we can look at the radial distribution function. The width of the first peak is the maximum # radial space that first shell of water molecules can occupy. Because for any direction only one molecule can occupy # that radial space, we can safely take the width as the value of C. For liquid water, the value is ~0.4 Angstrom. # Gets the highest temperature. desmond_exec = cfg[cfg.app.val] if (has_plugin(desmond_exec, "anneal")): temp = max(desmond_exec.plugin.anneal.schedule.value.val) else: if ("T_groups" in cfg.integrator.temperature): temp_list = [] for e in cfg.integrator.temperature.T_groups: temp_list.append(e.T_ref.val) temp = max(temp_list) else: temp = cfg.integrator.temperature.T_ref.val box_x, box_y, box_z = cms.get_boxsize(model.box) homebox = get_homebox(model.box, cpu) diffusion = 807.015 * math.exp(-4.87 / 1.986E-3 / temp) - 0.03 * max( 0, temp - 350) migration_interval = cfg.migration.interval.val rmsm = math.sqrt(6 * diffusion * migration_interval) margin = rmsm * 2.0 + 0.4 # We don't want to transfer a small amount of data between nodes very frequently. This is because of the transfer latency # overhead, which might exceed the actual data-transferring time. On the other hand we don't want a big margin either, # because a lot of particle pairs would have distances larger than the cutoff. This thought leads to a further checking: # If the margin is still ``big'' and the migration interval can be decreased, we will reduce the margin further more. # How ``big'' is ``big''? # - An arbitrary yet reasonable choice is this: If the margin causes > 50% extra computation, we consider it as big. # - pow( 1.5, 1.0/3.0 ) = 1.1447142425533319 while (margin > r_cut * 0.1447142425533319 and migration_interval >= far_dt * 1.9): migration_interval -= far_dt # Adjusts the migration interval to be `N x out_dt', where `N' is an integer number >= 1. N = int(old_div(migration_interval, out_dt)) N = 1 if (1 > N) else N migration_interval = N * out_dt rmsm = math.sqrt(6 * diffusion * migration_interval) margin = rmsm * 2.0 + 0.4 r_clone = (r_cut + margin) * 0.5 cfg.global_cell.margin.val = margin cfg.migration.interval.val = migration_interval r_cut4 = r_cut * 4 if (is_triclinic_box(model.box) and (box_x < r_cut4 or box_y < r_cut4 or box_y < r_cut4)): cfg.global_cell.clone_policy.val = "unrounded" else: cfg.global_cell.clone_policy.val = "rounded" if (cfg.force.nonbonded.far.type.val != "none"): toler = math.sqrt(abs(math.log(epsilon * r_cut))) sigma = old_div(math.sqrt(abs(math.log(epsilon * r_cut * toler))), r_cut) toler = math.sqrt(-math.log(epsilon * r_cut * (2.0 * toler * sigma)**2)) fac = sigma * toler / 3.1415926 n_k = [ 0.25 + fac * box_x, 0.25 + fac * box_y, 0.25 + fac * box_z, ] for i, n in enumerate(n_k): p = cpu[i] n = int(math.ceil(n)) # Each grid dimension must satisfy the following conditions: # 1. if p is given, it must divide n # 2. n must have no integer factors greater than 2, 3, or 5 while (True): # check that n is commensurate with p if (p): if (n % p): n += 1 continue N = n for radix in ( 2, 3, 5, ): while (not N % radix): N /= radix if (N == 1): break n += 1 continue n_k[i] = n cfg.force.nonbonded.far.n_k.val = n_k cfg.force.nonbonded.sigma.val = old_div(1, sigma) pme_order = cfg.force.nonbonded.far.order.val margin_ = old_div(margin, 1.414) if (cfg.force.nonbonded.far.type.val[-3:] == "gse"): r_clone = max(r_clone, cfg.force.nonbonded.far.r_spread.val + margin_) r_lazy = r_clone * 2.0 elif (cfg.force.nonbonded.far.type.val[-3:] == "pme"): a = cpu[0] * (pme_order[0] - 1) * 0.5 / n_k[0] b = cpu[1] * (pme_order[1] - 1) * 0.5 / n_k[1] c = cpu[2] * (pme_order[2] - 1) * 0.5 / n_k[2] tmpx = homebox[0] * a + homebox[1] * b + homebox[2] * c tmpy = homebox[3] * a + homebox[4] * b + homebox[5] * c tmpz = homebox[6] * a + homebox[7] * b + homebox[8] * c r_pme = math.sqrt(tmpx * tmpx + tmpy * tmpy + tmpz * tmpz) + margin_ r_clone = max(r_clone, r_pme + 0.01) mmc = cfg.integrator.pressure.max_margin_contraction.val # Ensures r_clone * max_margin_contraction > r_pme if the simulation is NPT. if (r_clone * mmc < r_pme and -1 < cfg.integrator.type.val.find("_NP")): delta = 0.0 while (old_div(r_pme, r_clone) > 0.97 and delta < 0.3): delta += 0.001 r_clone += 0.001 mmc = old_div(r_pme, r_clone) cfg.integrator.pressure.max_margin_contraction.val = mmc try: if (msj.bigger_rclone.val): r_clone /= mmc except AttributeError: pass r_lazy = r_clone * 2.0 if (r_lazy > old_div(box_x, cpu[0]) and r_lazy > old_div(box_y, cpu[1]) and r_lazy > old_div(box_z, cpu[2])): print("WARNING: Cutoff radius is too large relative to box size.\n" \ " Try either increasing box size or reducing cutoff radius.") cfg.global_cell.r_clone.val = r_clone + 1E-6
[docs]def parse_lambda(s): """ """ try: scheme, num_window = s.split(':') except ValueError: raise ValueError( "value not in the form of '<scheme>:<number of windows>'") scheme = scheme.strip().lower() if (scheme not in FEP_LAMBDA_SCHEMES): raise ValueError("unknown lambda scheme '%s'" % scheme) try: num_window = int(num_window) except ValueError: raise ValueError( "value not in the form of '<scheme>:<number of windows>'") if (num_window < 1): raise ValueError("number of lambda windows is less than 1") return scheme, num_window
[docs]def lambda_type(lambda_): """ """ if (isinstance(lambda_, sea.Atom)): return "generic" has_vdw = ("vdw" in lambda_) has_vdwA = ("vdwA" in lambda_) if (has_vdw and has_vdwA): return "ambiguous" if (not has_vdw and not has_vdwA): return "wrong" if (not has_vdw and has_vdwA): return "alchemical" if (has_vdw and not has_vdwA): return "binding" return "impossible"
[docs]def num_lambda_window(lambda_): """ """ if (isinstance(lambda_, sea.Atom)): scheme, num_window = parse_lambda(lambda_.val) else: try: num_window_vdw = len(lambda_.vdw) except AttributeError: num_window_vdw = None try: num_window_vdwA = len(lambda_.vdwA) except AttributeError: num_window_vdwA = None if (num_window_vdw is not None and num_window_vdwA is not None): raise ValueError("ambiguous lambda schedule") if (num_window_vdw is None and num_window_vdwA is None): raise ValueError("lambda schedule not set") if (num_window_vdw is None): num_window = num_window_vdwA else: num_window = num_window_vdw return num_window
[docs]def get_fep_lambda_schedule(lambda_, fep_type): """ """ if (isinstance(lambda_, sea.Atom)): val = lambda_.val if (isinstance(val, bool) and val): val = "default:12" if (val): scheme, n_win = parse_lambda(val) return get_fep_lambdas(n_win, fep_type, scheme) else: if (fep_type != lambda_type(lambda_)): raise ValueError("Wrong lambda schedule type") if (fep_type == "alchemical"): la = collections.namedtuple( 'Helper', ['vdwA', 'vdwB', 'chargeA', 'chargeB', 'bondedA', 'bondedB']) return la(vdwA=lambda_.vdwA.val, vdwB=lambda_.vdwB.val, chargeA=lambda_.chargeA.val, chargeB=lambda_.chargeB.val, bondedA=lambda_.bondedA.val, bondedB=lambda_.bondedB.val) else: la = collections.namedtuple('Helper', ['vdw', 'coulomb']) return la(vdw=lambda_.vdw.val, coulomb=lambda_.coulomb.val)
def _xchk_check_lambda(map, valid, ev, prefix): """ """ try: parse_lambda(map.val) except ValueError as e: ev.record_error(prefix, "Wrong value: %s" % str(e)) def _xchk_check_iwindow(map, valid, ev, prefix): """ """ try: num_window = num_lambda_window(ev._map.fep.lambda_) except ValueError as e: ev.record_error(prefix, "Wrong values: %s" % str(e)) else: v = map.val if (v >= num_window): ev.record_error( prefix + "[" + str(v) + "]", "Value out of range: expecting within [0, %d), but got '%d'" % ( num_window, v, )) def _xchk_check_annealing(map, valid, ev, prefix): """ """ if (is_remd(map.parent()) and map.val): ev.record_error(prefix, "Cannot run similuated annealing with REMD") def _xchk_check_remd_temp_generator(map, valid, ev, prefix): """ """ try: n = 0 if ("highest_temperature" in map): n += 1 if ("exchange_probability" in map): n += 1 if ("n_replica" in map): n += 1 if (n != 2): raise SyntaxError( "Invalid setting for temperature-ladder generator") except Exception as e: ev.record_error(prefix, "%s" % e) def _xchk_check_remd_temp_generator2(map, valid, ev, prefix): """ """ try: n = 0 if ("generator" in map and "PvdS" != map.generator.val): raise SyntaxError("Invalid temperature-ladder generator") if ("highest_temperature" in map): n += 1 if ("exchange_probability" in map): n += 1 if (n != 1): raise SyntaxError( "Invalid setting for temperature-ladder generator") except Exception as e: ev.record_error(prefix, "%s" % e) def _xchk_check_gcmc_region(param, valid, ev, prefix): if 'whole_box_frequency' in param: # TODO for some reason param.global_switching.has_tag("setbyuser") is # always true even when it is not set, but the filtered keys work. if param.global_switching.keys(tag="setbyuser"): ev.record_error( prefix, "Cannot define both global_switching and deprecated " "whole_box_frequency") return warnings.warn( UserWarning( f"{prefix[1:]}: whole_box_frequency is deprecated, please use " f"global_switching.frequency")) param.global_switching.frequency = deepcopy(param.whole_box_frequency) del param.whole_box_frequency def _xchk_check_forcefield(param, valid, ev, prefix): # Allowed alternate force fields if param.val in ( "CHARMM", "AMBER", "amber03", "amber99", "amber94", "amber96", "amber99SB", "amber99SB-ILDN", "charmm22nocmap", "charmm22star", "charmm36_lipids", "charmm36_protein", "charmm27", "charmm32", "oplsaa_ions_Jensen_2006", "oplsaa_impact_2005", ): return _xchk_check_opls_forcefield(param, valid, ev, prefix) def _xchk_check_opls_forcefield(param, valid, ev, prefix): # Valid OPLS force field names will correctly map to an OPLS version try: mm.opls_name_to_version(param.val) except IndexError as e: ev.record_error(prefix, str(e)) sea.reg_xcheck("check_lambda", _xchk_check_lambda) sea.reg_xcheck("check_iwindow", _xchk_check_iwindow) sea.reg_xcheck("check_annealing", _xchk_check_annealing) sea.reg_xcheck("check_remd_temp_generator", _xchk_check_remd_temp_generator) sea.reg_xcheck("check_remd_temp_generator2", _xchk_check_remd_temp_generator2) sea.reg_xcheck("check_gcmc_region", _xchk_check_gcmc_region) sea.reg_xcheck("check_forcefield", _xchk_check_forcefield) sea.reg_xcheck("check_opls_forcefield", _xchk_check_opls_forcefield) MD = """ ################################################## DATA = { fep = { lambda = "default:12" i_window = ? output = { name = "$JOBNAME$[_replica$REPLICA$].dE" first = 0.0 interval = 1.2 } trajectory = { record_windows = [0 -1] } } box = ? cutoff_radius = 9.0 bigger_rclone = no taper = off coulomb_method = useries temperature = 300.0 annealing = off pressure = 1.01325 surface_tension = 0.0 ensemble = NPT time = 1200.0 elapsed_time = 0.0 timestep = [0.002 0.002 0.006] cpu = 1 glue = "solute" trajectory = { name = "$JOBNAME$[_replica$REPLICA$]_trj" first = 0.0 interval = 4.8 periodicfix = true write_velocity = false frames_per_file= 250 center = [] format = dtr write_last_vel = false } eneseq = { name = "$JOBNAME$[_replica$REPLICA$].ene" first = 0.0 interval = 1.2 } checkpt = { name = "$JOBNAME.cpt" first = 0.0 interval = 240.06 write_last_step = yes } maeff_output = { name = "$JOBNAME$[_replica$REPLICA$]-out.cms" trjdir = "$JOBNAME$[_replica$REPLICA$]_trj" first = 0.0 interval = 120.0 periodicfix = true center_atoms = "solute" } randomize_velocity = { first = 0.0 interval = inf seed = 2007 temperature = '@*.temperature' } simbox = { name = "$JOBNAME$[_replica$REPLICA$]_simbox.dat" first = 0.0 interval = 1.2 } # energy_group = { # name = "$JOBNAME$[_replica$REPLICA$]_enegrp.dat" # first = 0.0 # interval = 1.2 # self_energy = false # corr_energy = true # } energy_group = off pressure_tensor = off dipole_moment = off meta = off meta_file = ? backend = { } restrain = none restraints = { existing = ignore new = [] } } ################################################## VALIDATE = { fep = { lambda = [ {type = str range = [3 10000000000] _check = check_lambda} {vdw = {type = list size = [sizeof @DATA.fep.lambda.coulomb] elem = {type = float0_1}} coulomb = {type = list size = [sizeof @DATA.fep.lambda.vdw ] elem = {type = float0_1}} } {vdwA = {type = list size = [sizeof @DATA.fep.lambda.vdwB ] elem = {type = float0_1}} vdwB = {type = list size = [sizeof @DATA.fep.lambda.chargeA] elem = {type = float0_1}} chargeA = {type = list size = [sizeof @DATA.fep.lambda.chargeB] elem = {type = float0_1}} chargeB = {type = list size = [sizeof @DATA.fep.lambda.bondedA] elem = {type = float0_1}} bondedA = {type = list size = [sizeof @DATA.fep.lambda.bondedB] elem = {type = float0_1}} bondedB = {type = list size = [sizeof @DATA.fep.lambda.vdwA ] elem = {type = float0_1}} } ] i_window = [ {type = none} {type = int0 _check = check_iwindow} ] output = { name = {type = str1} first = {type = "float+"} interval = {type = "float+"} } trajectory = { record_windows = [ {type = list size = 0 elem = {type = int}} {type = enum range = [all]} ] } } box = [{type = list elem = {type = "float+"}} {type = none}] cutoff_radius = {type = "float+"} bigger_rclone = {type = bool} taper = [ {type = bool0} {method = {type = enum range = [shift c1switch c2switch]} width = {type = "float+"} } ] coulomb_method = [ {type = enum range = [pme cutoff useries]} {type = list size = 2 elem = [{type = enum range = [pme]} {type = "float+"}]} ] temperature = [ {_if = ["!" @.DATA.annealing] type = "float+"} {_if = @.DATA.annealing type = list size = -1 elem = {type = list size = 2 elem = {type = "float+"}} } {_if = ["!" @.DATA.annealing] type = list size = -1 elem = {type = list size = 2 elem = [{type = "float+"} {type = int range = [0 7]} ] } } #{_if = @.DATA.replica type = list size = -2 elem = {type = "float+"}} ] annealing = {type = bool _check = check_annealing} pressure = [ {type = "float+"} {type = list size = 2 elem = [ {type = "float+"} {type = enum range = [anisotropic isotropic flexible monoclinic]} ] } ] surface_tension = {type = "float+"} ensemble = [ {type = enum range = [NPT NVT NVE NPgT NPAT NPT_L NVT_L NPT_Brownie NVT_Brownie NVT_DPD]} {class = {type = enum range = [NVE]}} {_enforce = [class method] class = {type = enum range = [NVE NPT NVT NPgT NPAT]} method = {type = enum range = [MTK NH Langevin Brownie DPD]} thermostat = { tau = {type = "float+"} } barostat = { tau = {type = "float+"} } brownie = { delta_max = {type = "float+"} } } ] time = {type = "float+"} elapsed_time = {type = "float+"} timestep = {type = list size = 3 elem = {type = "float+"}} cpu = [ {type = int1} {type = list size = 3 elem = {type = int1}} ] glue = [ {type = enum range = [solute retain none]} #{type = str1} #{type = list size = 2 elem = {type = str1}} ] trajectory = [ {type = bool0} {name = {type = str1} first = {type = "float+"} interval = {type = "float+"} periodicfix = {type = bool} write_velocity = {type = bool} write_last_vel = {type = bool} frames_per_file= {type = int1} center = [ {type = list size = 0 elem = {type = int1}} {type = str1} ] format = {type = enum range = [dtr DTR xtc XTC]} } ] eneseq = [ {type = bool0} {name = {type = str1} first = {type = "float+"} interval = {type = "float+"} precision= {type = int1} } ] checkpt = [ {type = bool} {name = {type = str1} first = {type = "float+"} interval = {type = "float+"} wall_interval = {type = "float+"} write_last_step = {type = bool} } ] maeff_output = [ {type = bool0} {name = {type = str1} trjdir = {type = str1} first = {type = "float+"} interval = {type = "float+"} periodicfix = {type = bool} center_atoms = [ {type = list size = 0 elem = {type = int1}} {type = str} ] } ] randomize_velocity = { _skip = [temperature] first = {type = "float+"} interval = {type = "float+"} seed = {type = int} } simbox = [ {type = bool} {name = {type = str1} first = {type = "float+"} interval = {type = "float+"} } ] energy_group = [ {type = bool} {name = {type = str1} first = {type = "float+"} interval = {type = "float+"} self_energy = {type = bool} corr_energy = {type = bool} } ] pressure_tensor = [ {type = bool} {name = {type = str1} first = {type = "float+"} interval = {type = "float+"} print_volume = {type = bool} } ] dipole_moment = [ {type = bool} {name = {type = str1} first = {type = "float+"} interval = {type = "float+"} } ] meta_file = [{type = str1} {type = none}] meta = [ {type = enum range = [FILE]} {height = {type = "float+"} first = {type = "float+"} interval = {type = "float+"} last = {type = "float+"} kTemp = {type = "float+"} name = {type = str1} cv_name = {type = str } cv = {type = list elem = {type = {type = enum range = [dist angle dihedral rmsd rmsd_alt rmsd_symm zdist0 zdist rgyr rgyr_mass whim1 whim2]} atom = {type = list size = -1 elem = {type = str1}} width = {type = float+} floor = {type = float } wall = {type = float } } size = 0 } } {type = bool0} ] backend = [ {_skip = all} {type = none} ] restrain = [ {type = enum range = [retain none]} {_mapcheck = check_restrain _skip = all} {type = list size = -1 elem = {_mapcheck = check_restrain _skip = all} } ] restraints = { existing = {type = enum range = [retain ignore ignore_posre]} new = {type = list size = 0 elem = {_skip = all}} } }""" MINIMIZE = [ MD, """ DATA = { max_steps = 2000 convergence = 1.0 steepest_descent_steps = 10 num_vector = 3 maeff_output.first = inf maeff_output.name = "$JOBNAME-out.cms" } VALIDATE = { max_steps = {type = int1} convergence = {type = "float+"} steepest_descent_steps = {type = int0} num_vector = {type = int1} }""", ] BROWNIE_MINIMIZE = [ MD, """ DATA = { ensemble.brownie.delta_max = 0.1 ensemble.method = Brownie ensemble.class = NVT ensemble.thermostat.tau = 1.0 time = 100.0 timestep = [0.001 0.001 0.003 ] maeff_output.name = "$JOBNAME-out.cms" maeff_output.first = inf temperature = 10.0 }""", ] REMD = [ MD, """ DATA = { replica = [ {model_file = ? temperature = 300} {model_file = ? temperature = 310} ] } VALIDATE = { replica = [{type = list size = -1} {generator = {type = enum range = [solute_tempering parallel_tempering]} atom = {type = str1} temperature = [ {generator = {type = enum range = [PvdS]} highest_temperature = {type = float+ } exchange_probability = {type = float0_1} n_replica = {type = int0 } _mapcheck = check_remd_temp_generator } {type = list size = -1 elem = {type = float+}} ] } ] } """, ] META = [ MD, """ DATA = { meta = { height = 0.03 first = 0.0 interval = 0.09 name = "$JOBNAME$[_replica$REPLICA$].kerseq" cv_name = "$JOBNAME$[_replica$REPLICA$].cvseq" cv = [] } } """ ] VRUN = [ MD, """ DATA = { vrun_frameset = "$FRAMESET" } VALIDATE = { vrun_frameset = [ {type = none} {type = str1} ] } """, ] REINIT = [ MD, """ DATA = { reinit_frameset = { first = 0.0 interval = 1200.0 configurations_file = "$REINIT_CONFIGURATIONS_FILE" } } VALIDATE = { reinit_frameset = { first = {type = "float+"} interval = {type = "float+"} configurations_file = {type = "str1"} } } """, ] CONCAT = [ MD, """ DATA = { simulate = [] } VALIDATE = { simulate = {type = list} } """ ] GCMC = [ MD, """ DATA = { gcmc = { first = 0.0 interval = 4.8 ene_name = "$JOBNAME$[_replica$REPLICA$]_gcmc.ene" moves = { moves_per_cycle = 5000 # number of trans/rot moves to ins/del moves # 1:2 = 1:1:1 ratio (trans:ins:del) # a:b = a:b/2:b/2 such that ins/del is fixed ### DISABLED UNTIL FUNCTIONALITY RESTORED # trans_to_ins_del_ratio = 0:1 # post_insertion_translation_moves = 0 } # the chemical potential in kcal/mol mu_excess = -6.180 # the number density of the solvent in Angstroms^-3 solvent_density = 0.03262 gcmc_region = { track_voids = true cell_size = .22 exclusion_radius = 2.2 # setting this to a value larger than half the largest box dimension # will cause gcmc to be done over the whole box. region_buffer = 4.0 # randomly switch to performing global GCMC, aka over the whole # simulation box. # has no effect if system is already "whole box" as determined by # the region_buffer and/or the absence of "gcmc_ligand" atoms. global_switching = { # percentage of time to do global frequency = 0.2 # when performing random switches, multiply the number of # attempts by this amount move_factor = 3.0 # when performing random switches, multiply the spacing by this # amount to conserve memory spacing_factor = 2.0 } } verbose = 0 # use random seed by default for multisim for testing statistics seed = random solvent = { s_file = '' } ligand_file = ? } } VALIDATE = { gcmc = { first = {type = "float+"} interval = {type = "float+"} ene_name = {type = "str1"} moves = { moves_per_cycle = {type = int1} # trans_to_ins_del_ratio = {type = str} # post_insertion_translation_moves = {type = int} } mu_excess = {type = float} solvent_density = {type = "float+"} gcmc_region = { track_voids = {type = bool} cell_size = {type = "float+"} exclusion_radius = {type = "float+"} region_buffer = {type = "float+"} global_switching = { frequency = {type = "float0_1"} move_factor = {type = "float+"} spacing_factor = {type = "float+"} } # deprecated in favor of above spec whole_box_frequency = {type = "float0_1"} _mapcheck = check_gcmc_region } verbose = {type = enum range = [0 1 2]} seed = [ {type = int} {type = str} ] solvent = { # we cannot use multisim file exists because the file is not # guaranteed to exist in the launch directory (it may be in the # ALT_DIR). But we don't need to worry, because the ADD_FILE # mechanism checks this anyway, and looks in the ALT_DIR. s_file = {type = str} num_copies = {type = int1} } ligand_file = [ {type = none} {type = str1} ] } }""", ] class _create_when_needed(object): def __init__(self, name, should_return_data=True): self._name = name self._should_return_data = should_return_data def __get__(self, obj, cls): if (self._name == "MD"): obj = sea.Map(MD) else: obj = sea.Map() obj["MD"] = cls.MD obj["VALIDATE"] = cls.VALIDATE_MD obj = deepcopy(obj) obj.update(globals()[self._name]) setattr(cls, self._name, obj.DATA) setattr(cls, "VALIDATE_" + self._name, obj.VALIDATE) return obj.DATA if (self._should_return_data) else obj.VALIDATE
[docs]class DEFAULT_SETTING(object): """ """ MD = _create_when_needed("MD") MINIMIZE = _create_when_needed("MINIMIZE") BROWNIE_MINIMIZE = _create_when_needed("BROWNIE_MINIMIZE") REMD = _create_when_needed("REMD") META = _create_when_needed("META") VRUN = _create_when_needed("VRUN") REINIT = _create_when_needed("REINIT") CONCAT = _create_when_needed("CONCAT") GCMC = _create_when_needed("GCMC") VALIDATE_MD = _create_when_needed("MD", False) VALIDATE_MINIMIZE = _create_when_needed("MINIMIZE", False) VALIDATE_BROWNIE_MINIMIZE = _create_when_needed("BROWNIE_MINIMIZE", False) VALIDATE_REMD = _create_when_needed("REMD", False) VALIDATE_META = _create_when_needed("META", False) VALIDATE_VRUN = _create_when_needed("VRUN", False) VALIDATE_REINIT = _create_when_needed("REINIT", False) VALIDATE_CONCAT = _create_when_needed("CONCAT", False) VALIDATE_GCMC = _create_when_needed("GCMC", False)
[docs]def get_default_setting(msj): """ """ gcmc_map = sea.Map() if is_gcmc(msj): gcmc_map = DEFAULT_SETTING.GCMC if is_minimize(msj): return DEFAULT_SETTING.MINIMIZE if is_remd(msj): return DEFAULT_SETTING.REMD.update(gcmc_map) if is_vrun(msj): return DEFAULT_SETTING.VRUN if is_reinit(msj): return DEFAULT_SETTING.REINIT if is_concat(msj): return DEFAULT_SETTING.CONCAT.update(gcmc_map) return DEFAULT_SETTING.MD.update(gcmc_map)
def _tag(map, tag, keys): for k in keys: if k in map: map[k].add_tag(tag)
[docs]def tag_sim_map(map, sim_tag_spec, **kwargs): tag = kwargs.get("tag", sim_tag_spec.tag) _tag(map, tag, sim_tag_spec.keys)
[docs]class TagSpec(object): """ Represents a tag and its associated keys """
[docs] def __init__(self, tag, remove_keys=()): self.tag = tag self.keys = list(getattr(DEFAULT_SETTING, tag)) for k in remove_keys: try: self.keys.remove(k) except ValueError: pass
class _create_tag_when_needed(object): def __init__(self, *args, **kwargs): self.tag = TagSpec(*args, **kwargs) def __get__(self, obj, cls): return deepcopy(self.tag) _minimize_remove_keys = [ "temperature", "annealing", "pressure", "surface_tension", "ensemble", "time", "elapsed_time", "timestep", "trajectory", "checkpt", "randomize_velocity", "simbox", ] _brownie_remove_keys = ["annealing", "pressure", "surface_tension", "simbox"]
[docs]class TAG_SPECS(object): md = _create_tag_when_needed("MD") minimize = _create_tag_when_needed("MINIMIZE", _minimize_remove_keys) brownie_minimize = _create_tag_when_needed("BROWNIE_MINIMIZE", _brownie_remove_keys) remd = _create_tag_when_needed("REMD", ["annealing"]) meta = _create_tag_when_needed("META") vrun = _create_tag_when_needed("VRUN") reinit = _create_tag_when_needed("REINIT") concat = _create_tag_when_needed("CONCAT") gcmc = _create_tag_when_needed("GCMC")
[docs]def is_minimize(msj): """ """ return ("steepest_descent_steps" in msj)
[docs]def is_remd(msj): """ """ return ("replica" in msj)
[docs]def is_meta(msj): """ """ if ("meta_file" in msj and isinstance(msj.meta, sea.Atom)): return bool(msj.meta_file.val) return ("meta" in msj and isinstance(msj.meta, sea.Map) and 0 < len(msj.meta.cv))
[docs]def is_vrun(msj): """ """ return ("vrun_frameset" in msj and msj.vrun_frameset.val is not None)
[docs]def is_reinit(msj): """ Does `msj` require the "reinit" app? """ return ("reinit_frameset" in msj and isinstance(msj.reinit_frameset, sea.Map))
[docs]def is_concat(msj): """ """ return "simulate" in msj
[docs]def is_gcmc(msj): return "gcmc" in msj
[docs]def is_fep(msj): md_for_fep = msj.val.get("backend", {}).get("is_for_fep", False) return "fep" in msj or md_for_fep
[docs]def get_exec(msj, cfg): """ """ if is_minimize(msj): return cfg.minimize if is_remd(msj): return cfg.remd if is_vrun(msj): return cfg.vrun if is_reinit(msj): return cfg.reinit return cfg.mdsim
[docs]def get_exec_name(msj): """ """ if is_minimize(msj): return "minimize" if is_remd(msj): return "remd" if is_vrun(msj): return "vrun" if is_reinit(msj): return "reinit" return "mdsim"
[docs]def num_replica(msj, model, printer=None): """ """ if (not is_remd(msj)): return 1 if (isinstance(msj.replica, sea.Map)): generator = msj.replica.generator.val if (generator in [ "solute_tempering", "parallel_tempering", ]): if (isinstance(msj.replica.temperature, sea.Map)): ref_temp = msj.temperature.val if (isinstance( msj.temperature, sea.Atom)) else msj.temperature[0][0].val ref_temp = float(ref_temp) asl = msj.replica.atom.val if (generator == "solute_tempering") else "all" temp = gen_temperature_for_solute_tempering( msj.replica.temperature, model, ref_temp, asl, printer) else: temp = msj.replica.temperature num_replica = len(temp) else: num_replica = len(msj.replica) return num_replica
[docs]def get_rest_replica(atom_selection=f"atom.{constants.REST_HOTREGION} 1", exchange_probability=0.3, n_replica=2): """ Return REST-specific replicas :rtype: `sea.Map` """ REST_REPLICA_TEMPLATE = """ replica = { generator = solute_tempering atom = "asl: %s" temperature = { generator = PvdS exchange_probability = %g n_replica = %i } } """ return sea.Map(REST_REPLICA_TEMPLATE % (atom_selection, exchange_probability, n_replica))
[docs]def num_cpu(msj): """ """ cpu = msj.cpu.val n = 1 for e in (cpu if (isinstance(cpu, list)) else [ cpu, ]): n *= e return n
[docs]def get_fep_lambdas(n_win, fep_type, scheme): """ return default lambdas """ if scheme == "flexible": scheme = "default" return fep_schedule.get_fep_schedule(n_win, fep_type, scheme).get_lambda()
def __set_fep(msj, cfg, model): """ """ if (model is None): return sys_type = cms.get_model_system_type(model.comp_ct) fep_type = "alchemical" if ( sys_type == constants.SystemType.ALCHEMICAL) else "binding" cfg.force.nonbonded.near.type.val = fep_type + ":softcore" far_type = cfg.force.nonbonded.far.type.val if (fep_type == "binding" and far_type in [ "pme", "gse", ]): cfg.force.nonbonded.far.type.val = "binding:" + far_type if sys_type == constants.SystemType.OTHER: return if ("gibbs" not in cfg.force.term.list.val): cfg.force.term.list.append("gibbs") gibbs = cfg.force.term.gibbs gibbs.type.val = fep_type gibbs.output.name.val = msj.fep.output.name.val gibbs.output.first.val = msj.fep.output.first.val gibbs.output.interval.val = msj.fep.output.interval.val if (msj.fep.i_window.val is not None): gibbs.window.val = msj.fep.i_window.val if (isinstance(msj.fep.lambda_, sea.Atom)): scheme, n_win = parse_lambda(msj.fep.lambda_.val) if (n_win < 4): raise ValueError("Total number of windows should be 4 at least") fs = fep_schedule.get_fep_schedule(n_win, fep_type, scheme).get_lambda() la = gibbs.weights la["vdw"] = fs.vdw la["es"] = fs.coulomb la["vdwA"] = fs.vdwA la["vdwB"] = fs.vdwB la["qA"] = fs.chargeA la["qB"] = fs.chargeB la["qC"] = [1 - eA - eB for eA, eB in zip(fs.chargeA, fs.chargeB)] la["bondA"] = fs.bondedA la["bondB"] = fs.bondedB la["N"] = n_win else: if (fep_type != lambda_type(msj.fep.lambda_)): raise ValueError("Wrong lambda schedule type") if (fep_type == "alchemical"): fs = msj.fep.lambda_ la = gibbs.weights la["vdwA"] = fs.vdwA la["vdwB"] = fs.vdwB la["qA"] = fs.chargeA la["qB"] = fs.chargeB la["qC"] = [ 1 - eA.val - eB.val for eA, eB in zip(fs.chargeA, fs.chargeB) ] la["bondA"] = fs.bondedA la["bondB"] = fs.bondedB la["N"] = len(fs.vdwA) else: fs = msj.fep.lambda_ la = gibbs.weights la["vdw"] = fs.vdw la["es"] = fs.coulomb la["N"] = len(fs.vdw) def __set_meta_file(msj, cfg, model): """ """ if msj.meta.val == 'FILE': with open(msj.meta_file.val) as fh: m_expr = fh.read() from schrodinger.application.desmond.enhsamp import parseStr s_expr = parseStr(model, m_expr) cfg.force.term.list.append("ES") cfg.force.term.ES = sea.Map(s_expr) def __set_meta(msj, cfg, model): """ """ if not model or isinstance(msj.meta, sea.Atom) or len(msj.meta.cv) == 0: return # Converts the ASL expressions in the `cv[*].atom' list to lists of atoms. meta = deepcopy(msj.meta) for cv in meta.cv: new_atom = sea.List() for atom in cv.atom: atom_list = model.select_atom(atom.val) new_atom.append(atom_list) cv["atom"] = new_atom from schrodinger.application.desmond.meta import generate_meta_cfg cfg.force.term.list.append("ES") cfg.force.term["ES"] = sea.Map(generate_meta_cfg(meta, model)) last = meta.last.val if ("last" in meta) else "inf" for e in cfg.force.term.ES.metadynamics_accumulators: e["last"] = last def __set_cutoff_radius(msj, cfg, model): """ """ cfg.force.nonbonded.r_cut.val = msj.cutoff_radius.val def __set_bigger_rclone(msj, cfg, model): """ """ def __set_taper(msj, cfg, model): """ """ default_taper_setting = sea.Map("method = shift width = 1.0") if (isinstance(msj.taper, sea.Atom)): if (msj.taper.val): cfg.force.nonbonded.near.taper.val = default_taper_setting.method.val cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val - default_taper_setting.width.val else: cfg.force.nonbonded.near.taper.val = "none" cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val else: custom_taper_setting = msj.taper msj.taper = default_taper_setting msj.taper.update(custom_taper_setting) cfg.force.nonbonded.near.taper.val = msj.taper.method.val cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val - msj.taper.width.val if (cfg.force.nonbonded.near.r_tap.val < 0): raise ValueError("Invalid value for 'r_tap': %f" % cfg.force.nonbonded.near.r_tap.val) def __set_coulomb_method(msj, cfg, model): """ """ if (isinstance(msj.coulomb_method, sea.Atom)): if (msj.coulomb_method.val in [ "useries", "pme", ]): cfg.force.nonbonded.far.type.val = "pme" else: cfg.force.nonbonded.far.type.val = "none" cfg.force.nonbonded.sigma.val = "inf" else: method_name, ewald_tol = msj.coulomb_method.val if (method_name == "pme"): cfg.force.nonbonded.far.type.val = "pme" cfg.gui.ewald_tol.val = ewald_tol else: raise ValueError("Unknown coulomb method: %s" % method_name) def __set_temperature(msj, cfg, model): """ """ if ("annealing" in msj and msj.annealing.val): cfg.mdsim.plugin.anneal.schedule["time"] = sea.List() cfg.mdsim.plugin.anneal.schedule["value"] = sea.List() time = cfg.mdsim.plugin.anneal.schedule.time value = cfg.mdsim.plugin.anneal.schedule.value for e in msj.temperature: value.append(e[0].val) time.append(e[1].val) else: if (isinstance(msj.temperature, sea.Atom)): # FIXME: Here we consider only the case of 1 thermostat. temp = sea.Map() temp["T_ref"] = msj.temperature.val cfg.integrator["temperature"] = temp if not isinstance(cfg.integrator.Multigrator.barostat, sea.Atom): cfg.integrator.Multigrator.barostat.MTK.T_ref.val = msj.temperature.val else: temperature = sea.Map() if (1 < len(msj.temperature)): t_list = sea.List() for e in msj.temperature: temp = sea.Map() temp["T_ref"] = e[0].val temp["groups"] = sea.List([ e[1].val, ]) t_list.append(temp) temperature["T_groups"] = t_list temperature["T_ref"] = msj.temperature[0][0].val cfg.integrator["temperature"] = temperature def __set_annealing(msj, cfg, model): """ """ if (msj.annealing.val): add_plugin(cfg.mdsim, "anneal") else: remove_plugin(cfg.mdsim, "anneal") def __set_pressure(msj, cfg, model): """ """ if (isinstance(msj.pressure, sea.Atom)): cfg.integrator.pressure.P_ref.val = msj.pressure.val else: cfg.integrator.pressure.P_ref.val = msj.pressure[0].val if (isinstance(msj.ensemble, sea.Atom)): if (msj.ensemble.val in [ "NPT", "NPT_Ber", "NPT_L", ]): cfg.integrator.pressure.isotropy.val = msj.pressure[1].val else: if (msj.ensemble.class_.val == "NPT"): cfg.integrator.pressure.isotropy.val = msj.pressure[1].val def __set_surface_tension(msj, cfg, model): """ """ if ((isinstance(msj.ensemble, sea.Atom) and msj.ensemble.val == "NPgT") or (isinstance(msj.ensemble, sea.Map) and msj.ensemble.class_.val == "NPgT")): cfg.integrator.pressure["tension_ref"] = msj.surface_tension.val else: if ("tension_ref" in cfg.integrator.pressure): del cfg.integrator.pressure["tension_ref"]
[docs]def get_ensemble_class_method(msj): """ """ if (isinstance(msj.ensemble, sea.Atom)): ens_type_map = { "NPT": ( "NPT", "MTK", ), "NVT": ( "NVT", "NH", ), "NVE": ( "NVE", "NH", ), # Method for "NVE" will be ignored. "NPgT": ( "NPgT", "MTK", ), "NPAT": ( "NPAT", "MTK", ), "NPT_Ber": ( "NPT", "Berendsen", ), "NVT_Ber": ( "NVT", "Berendsen", ), "NPT_L": ( "NPT", "Langevin", ), "NVT_L": ( "NVT", "Langevin", ), } return ens_type_map[msj.ensemble.val] else: return ( msj.ensemble.class_.val, msj.ensemble.method.val, )
def __set_ensemble(msj, cfg, model): """ """ integrator = cfg.integrator ens = "NVE" tau = None if (isinstance(msj.ensemble, sea.Atom)): ens_type_map = { "NPT": "MTK_NPT", "NVT": "NH_NVT", "NVE": "V_NVE", "NPgT": "MTK_NPT", "NPAT": "MTK_NPT", "NPT_Ber": "Ber_NPT", "NVT_Ber": "Ber_NVT", "NPT_L": "L_NPT", "NVT_L": "L_NVT", "NPT_Brownie": "brownie_NPT", "NVT_Brownie": "brownie_NVT", "NVT_DPD": "DPD_NVT" } key = msj.ensemble.val ens = key else: if (msj.ensemble.class_.val == "NVE"): ens_type_map = { "NVE": "V_NVE", } key = msj.ensemble.class_.val else: # FIXME: Check the consistency between the "class" and "method" parameters when job is launched. ens_type_map = { ( "NPT", "MTK", ): "MTK_NPT", ( "NPgT", "MTK", ): "MTK_NPT", ( "NPAT", "MTK", ): "MTK_NPT", ( "NVT", "NH", ): "NH_NVT", ( "NPT", "Berendsen", ): "Ber_NPT", ( "NPgT", "Berendsen", ): "Ber_NPT", ( "NPAT", "Berendsen", ): "Ber_NPT", ( "NVT", "Berendsen", ): "Ber_NVT", ( "NPT", "Langevin", ): "L_NPT", ( "NPgT", "Langevin", ): "L_NPT", ( "NPAT", "Langevin", ): "L_NPT", ( "NVT", "Langevin", ): "L_NVT", ( "NPT", "Brownie", ): "brownie_NPT", ( "NVT", "Brownie", ): "brownie_NVT", } ens = msj.ensemble.class_.val method = msj.ensemble.method.val ttau = msj.ensemble.thermostat.tau.val if ( "thermostat" in msj.ensemble and method != "Brownie") else None btau = msj.ensemble.barostat.tau.val if ("barostat" in msj.ensemble) else None key = ( ens, method, ) tau = ( ttau, btau, ) t = ens_type_map[key] if (ens == "NPgT"): integrator.pressure.isotropy.val = "semi_isotropic" elif (ens == "NPAT"): integrator.pressure.isotropy.val = "constant_area" if (t in [ "V_NVE", ]): integrator.Multigrator["thermostat"] = "none" if (t in [ "NH_NVT", "Ber_NVT", "L_NVT", "V_NVE", ]): integrator.Multigrator["barostat"] = "none" if (tau is not None): # tau[0] is for thermostat, tau[1] for barostat. if (t in [ "MTK_NPT", "NH_NVT", ]): if (tau[0]): ts = integrator.Multigrator.thermostat.timesteps.val for e in integrator.Multigrator.thermostat.NoseHoover.tau: e.val = old_div(tau[0], float(ts)) if (t == "MTK_NPT"): ts = integrator.Multigrator.barostat.timesteps.val if (tau[0]): for e in integrator.Multigrator.barostat.MTK.thermostat.NoseHoover.tau: e.val = old_div(tau[0], float(ts)) if (tau[1]): integrator.Multigrator.barostat.MTK.tau.val = old_div( tau[1], float(ts)) elif (t in [ "Ber_NPT", "Ber_NVT", ]): if (tau[0]): integrator[t].tau.val = tau[0] if (t == "Ber_NPT" and tau[1]): integrator.Ber_NPT.barostat.tau.val = tau[1] elif t == "L_NVT": integrator.Multigrator.thermostat.type.val = "Langevin" if tau[0] is not None: ts = integrator.Multigrator.thermostat.timesteps.val integrator.Multigrator.thermostat.Langevin.tau.val = old_div( tau[0], float(ts)) elif t == "L_NPT": integrator.Multigrator.thermostat.type.val = "Langevin" integrator.Multigrator.barostat.MTK.thermostat.type.val = "Langevin" if tau[0] is not None: ts = integrator.Multigrator.thermostat.timesteps.val integrator.Multigrator.thermostat.Langevin.tau.val = old_div( tau[0], float(ts)) integrator.Multigrator.barostat.MTK.thermostat.Langevin.tau.val = old_div( tau[0], float(ts)) if tau[1] is not None: ts = integrator.Multigrator.barostat.timesteps.val integrator.Multigrator.barostat.MTK.tau.val = old_div( tau[1], float(ts)) elif (t == "brownie_NVT"): integrator.brownie_NVT.delta_max.val = msj.ensemble.brownie.delta_max.val elif (t == "brownie_NPT"): if (tau[1] is not None): integrator.brownie_NPT.barostat.tau.val = tau[1] integrator.brownie_NPT.delta_max.val = msj.ensemble.brownie.delta_max.val if (isinstance(msj.temperature, sea.Atom)): integrator.brownie_NPT.barostat.T_ref.val = msj.temperature.val else: integrator.brownie_NPT.barostat.T_ref.val = msj.temperature[0][ 0].val if t in ["Ber_NPT", "Ber_NVT", "brownie_NVT", "brownie_NPT"]: integrator.type.val = t desmond_exec = get_exec(msj, cfg) if (t in [ "Ber_NPT", "Ber_NVT", ]): cfg.force.ignore_com_dofs.val = True desmond_exec.plugin.remove_com_motion.first.val = 0.0 desmond_exec.plugin.remove_com_motion.interval.val = 0.0 def __set_time(msj, cfg, model): """ """ desmond_exec = get_exec(msj, cfg) desmond_exec.last_time.val = msj.time.val def __set_elapsed_time(msj, cfg, model): """ """ cfg.global_cell.reference_time.val = msj.elapsed_time.val def __set_timestep(msj, cfg, model): """ """ timestep = msj.timestep.val cfg.integrator.dt.val = timestep[0] cfg.integrator.respa.near_timesteps.val = int( old_div(timestep[1], timestep[0]) + 0.1) cfg.integrator.respa.far_timesteps.val = int( old_div(timestep[2], timestep[0]) + 0.1) cfg.integrator.respa.outer_timesteps.val = int( old_div(timestep[2], timestep[0]) + 0.1) def __set_cpu(msj, cfg, model): """ """ if (model is None): return if (isinstance(msj.cpu, sea.Atom)): import schrodinger.application.desmond.autopartition as autopartition x, y, z = cms.get_boxsize(model.box) nx, ny, nz = autopartition.auto_partition([ x, y, z, ], msj.cpu.val) else: nx, ny, nz = msj.cpu.val cfg.global_cell["partition"] = ( nx, ny, nz, ) def __set_glue(msj, cfg, model): """ """ def __set_vrun_frameset(msj, cfg, model): """ """ cfg.vrun.frameset.name.val = msj.vrun_frameset.val def __set_reinit_frameset(msj, cfg, model): for name in ('first', 'interval', 'configurations_file'): cfg.reinit.frameset[name].val = msj.reinit_frameset[name].val def __set_trajectory(msj, cfg, model): """ """ if (is_minimize(msj)): return desmond_exec = get_exec(msj, cfg) if (isinstance(msj.trajectory, sea.Atom)): remove_plugin(desmond_exec, "trajectory") return desmond_exec.plugin.trajectory.name.val = msj.trajectory.name.val desmond_exec.plugin.trajectory.first.val = msj.trajectory.first.val desmond_exec.plugin.trajectory.interval.val = msj.trajectory.interval.val desmond_exec.plugin.trajectory.periodicfix.val = msj.trajectory.periodicfix.val desmond_exec.plugin.trajectory.write_velocity.val = msj.trajectory.write_velocity.val try: desmond_exec.plugin.trajectory.frames_per_file.val = msj.trajectory.frames_per_file.val except AttributeError: pass try: center = msj.trajectory.center if (isinstance(center, sea.List)): desmond_exec.plugin.trajectory.center.val = [ model.gid(e) for e in center.val ] else: desmond_exec.plugin.trajectory.center.val = [ model.gid(e) for e in model.select_atom(center.val) ] except AttributeError: pass def __set_eneseq(msj, cfg, model): """ """ desmond_exec = get_exec(msj, cfg) if (isinstance(msj.eneseq, sea.Atom)): remove_plugin(desmond_exec, "eneseq") return desmond_exec.plugin.eneseq.name.val = msj.eneseq.name.val desmond_exec.plugin.eneseq.first.val = msj.eneseq.first.val desmond_exec.plugin.eneseq.interval.val = msj.eneseq.interval.val try: desmond_exec.plugin.eneseq["precision"] = msj.eneseq.precision.val except AttributeError: pass def __set_checkpt(msj, cfg, model): """ """ if (is_minimize(msj)): return desmond_exec = get_exec(msj, cfg) if (isinstance(msj.checkpt, sea.Atom)): if (msj.checkpt.val): __set_checkpt(get_default_setting(msj), cfg, model) else: desmond_exec.checkpt.first.val = "inf" desmond_exec.checkpt.interval.val = "inf" desmond_exec.checkpt.wall_interval.val = "inf" desmond_exec.checkpt.write_last_step.val = False else: desmond_exec.checkpt.name.val = msj.checkpt.name.val desmond_exec.checkpt.write_last_step.val = msj.checkpt.write_last_step.val if ("wall_interval" in msj.checkpt and desmond_exec.checkpt.wall_interval.val != float("inf")): desmond_exec.checkpt.first.val = "inf" desmond_exec.checkpt.interval.val = "inf" desmond_exec.checkpt.wall_interval.val = msj.checkpt.wall_interval.val else: desmond_exec.checkpt.first.val = msj.checkpt.first.val desmond_exec.checkpt.interval.val = msj.checkpt.interval.val desmond_exec.checkpt.wall_interval.val = "inf" def __set_maeff_output(msj, cfg, model): """ """ desmond_exec = get_exec(msj, cfg) if (isinstance(msj.maeff_output, sea.Atom)): remove_plugin(desmond_exec, "maeff_output") return desmond_exec.plugin.maeff_output.name.val = msj.maeff_output.name.val desmond_exec.plugin.maeff_output.first.val = msj.maeff_output.first.val desmond_exec.plugin.maeff_output.interval.val = msj.maeff_output.interval.val try: # If user runs desmond directly without multisim and if the center_atoms # is not set in the input .cfg file, we will get an AttributeError. desmond_exec.plugin.maeff_output[ "center_atoms"] = msj.maeff_output.center_atoms.val except AttributeError: desmond_exec.plugin.maeff_output[ "center_atoms"] = DEFAULT_SETTING.MD.maeff_output.center_atoms.val if is_minimize(msj): desmond_exec.plugin.maeff_output.trjdir.val = "" else: # read trjidx key to support front-end cfg files generated with # prior versions (<15-4) if 'trjidx' in msj.maeff_output: desmond_exec.plugin.maeff_output.trjdir.val = msj.maeff_output.trjidx.val.replace( '-out.idx', '_trj') else: desmond_exec.plugin.maeff_output.trjdir.val = msj.maeff_output.trjdir.val if ("periodicfix" in msj.maeff_output): desmond_exec.plugin.maeff_output.periodicfix.val = msj.maeff_output.periodicfix.val def __set_randomize_velocity(msj, cfg, model): """ """ if (is_minimize(msj)): return desmond_exec = get_exec(msj, cfg) if (isinstance(msj.randomize_velocity, sea.Atom)): if (msj.randomize_velocity.val): __set_randomize_velocity(get_default_setting(msj), cfg, model) else: remove_plugin(desmond_exec, "randomize_velocity") else: desmond_exec.plugin.randomize_velocities.first.val = msj.randomize_velocity.first.val desmond_exec.plugin.randomize_velocities.interval.val = msj.randomize_velocity.interval.val desmond_exec.plugin.remove_com_motion.first.val = msj.randomize_velocity.first.val desmond_exec.plugin.remove_com_motion.interval.val = msj.randomize_velocity.interval.val try: desmond_exec.plugin.randomize_velocities.seed.val = msj.randomize_velocity.seed.val except AttributeError: pass temp = msj.randomize_velocity.temperature.val if (isinstance(temp, list)): desmond_exec.plugin.randomize_velocities.temperature.val = temp[0][ 0] else: desmond_exec.plugin.randomize_velocities.temperature.val = temp def __set_simbox(msj, cfg, model): """ """ if (is_minimize(msj)): return desmond_exec = get_exec(msj, cfg) if (isinstance(msj.simbox, sea.Atom)): if (msj.simbox.val): __set_simbox(get_default_setting(msj), cfg, model) else: remove_plugin(desmond_exec, "simbox_output") else: desmond_exec.plugin.simbox_output.name.val = msj.simbox.name.val desmond_exec.plugin.simbox_output.first.val = msj.simbox.first.val desmond_exec.plugin.simbox_output.interval.val = msj.simbox.interval.val add_plugin(desmond_exec, "simbox_output")
[docs]def get_simbox_output_filename(msj): """ Returns 'None' if the simbox_output plugin is turned off. """ if (is_minimize(msj)): return None if (isinstance(msj.simbox, sea.Atom)): if (sea.simbox.val): return get_simbox_output_filename(get_default_setting(msj)) else: return msj.simbox.name.val
def __set_energy_group(msj, cfg, model): """ """ default_energy_group_setting = sea.Map(""" name = "$JOBNAME$[_replica$REPLICA$]_enegrp.dat" first = 0.0 interval = 1.2 self_energy = false corr_energy = true """) desmond_exec = get_exec(msj, cfg) if (isinstance(msj.energy_group, sea.Atom)): if (msj.energy_group.val): msj["energy_group"] = default_energy_group_setting __set_energy_group(msj, cfg, model) else: remove_plugin(desmond_exec, "energy_groups") else: custom_energy_group_setting = msj.energy_group msj.energy_group = default_energy_group_setting msj.energy_group.update(custom_energy_group_setting) desmond_exec.plugin.energy_groups.name.val = msj.energy_group.name.val desmond_exec.plugin.energy_groups.first.val = msj.energy_group.first.val desmond_exec.plugin.energy_groups.interval.val = msj.energy_group.interval.val eg = desmond_exec.plugin.energy_groups def set_options(term): if (term in msj.energy_group): if (msj.energy_group[term].val): eg.options.append(term) else: try: eg.options.remove(term) except ValueError: pass set_options("self_energy") set_options("corr_energy") add_plugin(desmond_exec, "energy_groups") def __set_pressure_tensor(msj, cfg, model): """ Set ptensor plugin and corresponding variables from MSJ pressure_tensor block. """ default_settings = sea.Map(""" name = "$JOBNAME$[_replica$REPLICA$].ptensor.gz" first = 0.0 interval = 1.2 print_volume = false """) desmond_exec = get_exec(msj, cfg) if (isinstance(msj.pressure_tensor, sea.Atom)): if (msj.pressure_tensor.val): msj["pressure_tensor"] = default_settings __set_pressure_tensor(msj, cfg, model) else: remove_plugin(desmond_exec, "ptensor") else: custom_settings = msj.pressure_tensor msj.pressure_tensor = default_settings msj.pressure_tensor.update(custom_settings) desmond_exec.plugin.ptensor.name.val = msj.pressure_tensor.name.val desmond_exec.plugin.ptensor.first.val = msj.pressure_tensor.first.val desmond_exec.plugin.ptensor.interval.val = msj.pressure_tensor.interval.val desmond_exec.plugin.ptensor.print_volume.val = msj.pressure_tensor.print_volume.val add_plugin(desmond_exec, "ptensor") def __set_dipole_moment(msj, cfg, model): """ Set dipole moment plugin and corresponding variables from MSJ block. """ default_settings = sea.Map(""" first = 0.0 interval = 1.2 name = "$JOBNAME$[_replica$REPLICA$]_dipole.dat" """) desmond_exec = get_exec(msj, cfg) if (isinstance(msj.dipole_moment, sea.Atom)): if (msj.dipole_moment.val): msj["dipole_moment"] = default_settings __set_dipole_moment(msj, cfg, model) else: remove_plugin(desmond_exec, "dipole_moment") else: custom_settings = msj.dipole_moment msj.dipole_moment = default_settings msj.dipole_moment.update(custom_settings) desmond_exec.plugin.dipole_moment.name.val = msj.dipole_moment.name.val desmond_exec.plugin.dipole_moment.first.val = msj.dipole_moment.first.val desmond_exec.plugin.dipole_moment.interval.val = msj.dipole_moment.interval.val add_plugin(desmond_exec, "dipole_moment")
[docs]def get_energy_group_output_filename(msj): """ Returns 'None' if the energy_groups plugin is turned off. """ if (isinstance(msj.energy_group, sea.Atom)): if (msj.energy_group.val): return get_energy_group_output_filename(get_default_setting(msj)) else: return msj.energy_group.name.val if ( msj.time.val > msj.energy_group.first.val) else None
[docs]def get_fep_output_filename(msj): """ """ if not is_minimize(msj) and not is_vrun(msj): try: if (msj.time.val > msj.fep.output.first.val): return msj.fep.output.name.val except AttributeError: pass
# For minimization def __set_max_steps(msj, cfg, model): """ """ cfg.minimize.maxsteps.val = msj.max_steps.val def __set_convergence(msj, cfg, model): """ """ cfg.minimize.tol.val = msj.convergence.val def __set_steepest_descent_steps(msj, cfg, model): """ """ cfg.minimize.sdsteps.val = msj.steepest_descent_steps.val def __set_num_vector(msj, cfg, model): """ """ cfg.minimize.m.val = msj.num_vector.val def __set_backend(msj, cfg, model): """ """ if ("backend" in msj and isinstance(msj.backend, sea.Map)): cfg.update(msj.backend.dval) # For remd
[docs]def gen_temperature_for_solute_tempering(setting, model, base_temp, asl, printer=None): """ """ from . import remd if (model is None): print( "WARNING: Cannot predict REMD temperature ladder without model system." ) return [ 300, 400, 500, 600, 700, 800, ] if ("highest_temperature" in setting and "exchange_probability" in setting): top_temp = setting.highest_temperature.val temp, prob = remd.predict_with_temp_and_exch(( base_temp, top_temp, ), setting.exchange_probability.val, model, asl) if ("highest_temperature" in setting and "n_replica" in setting): top_temp = setting.highest_temperature.val temp, prob = remd.predict_with_temp_and_nreplica(( base_temp, top_temp, ), setting.n_replica.val, model, asl) if ("exchange_probability" in setting and "n_replica" in setting): temp, prob = remd.predict_with_nreplica_and_exch( setting.n_replica.val, setting.exchange_probability.val, base_temp, model, asl) if (printer): printer("Temperature ladder (exchange probability):") for t, p in zip(temp, prob): printer(" %.3g (%.3g)" % ( t, p, )) printer(" %.3g (n/a)" % temp[-1]) return temp
[docs]def gen_replica(msj, model=None): """ """ replica = msj.replica generator = replica.generator.val if (generator in [ "solute_tempering", "parallel_tempering", ]): ref_temp = msj.temperature.val if (isinstance( msj.temperature, sea.Atom)) else msj.temperature[0][0].val ref_temp = float(ref_temp) asl = replica.atom.val if (generator == "solute_tempering") else "all" if (isinstance(replica.temperature, sea.Map)): rep_temp = gen_temperature_for_solute_tempering( replica.temperature, model, ref_temp, asl) else: rep_temp = replica.temperature.val replica = sea.List() if (ref_temp not in rep_temp): print( "WARNING: Reference temperature (%f) not found in replicas (%s)" % ( ref_temp, str(rep_temp), )) if (generator == "solute_tempering"): rep_fname = sea.Atom( "$JOBNAME_solute_tempering_replica%d-in.cms").val for i, e in enumerate(rep_temp): fname = rep_fname % i if (model): from . import remd remd.write_ff_solute_tempering(model, fname, asl, old_div(ref_temp, e)) replica.append(sea.Map("model_file = %s" % fname)) else: for e in rep_temp: replica.append(sea.Map("temperature = %.3g model_file = ?" % e)) else: raise ValueError("Unknown replica generator: %s" % replica.generator.val) return replica
def __set_replica(msj, cfg, model): """ """ msj = deepcopy(msj) replica_msj = [] if (is_remd(msj)): replica = msj.replica if (isinstance(replica, sea.Map)): replica = gen_replica(msj, model) if (not isinstance(replica, sea.List)): raise ValueError("Unknown type for 'replica' parameter: %s" % type(replica)) for e in replica: msj_ = deepcopy(msj) msj_.update(e) replica_msj.append(msj_) return ( msj, replica_msj, )
[docs]def get_replica_setting(msj): """ """ ref, replica = __set_replica(msj, None, None) return replica
def __set_model_file(msj, cfg, model): """ """ if ("model_file" in msj): desmond_exec = get_exec(msj, cfg) desmond_exec.plugin.maeff_output["bootfile"] = msj.model_file.val def __attach_msj(msj, cfg, model): """ """ cfg["ORIG_CFG"] = msj def __set_restrain(msj, cfg, model): """ restrain only used in gconfig.py for concatenator """ def __set_restraints(msj, cfg, model): """ restraints only used in gconfig.py for concatenator """ def __set_box(msj, cfg, model): pass def __optimize_cfg(msj, cfg, model): """ """ if (model): optimize_key(msj, cfg, model) # FIXME: replace optimize_key with forceconfig. def __clean_cfg(cfg): """ """ app_map = { "mdsim": [ "PLUGIN_COMMON", "CHECKPT_COMMON", "minimize", "remd", "vrun", ], "vrun": [ "PLUGIN_COMMON", "CHECKPT_COMMON", "minimize", "remd", "mdsim", ], "remd": [ "PLUGIN_COMMON", "CHECKPT_COMMON", "minimize", "vrun", "mdsim", ], "minimize": [ "PLUGIN_COMMON", "CHECKPT_COMMON", "remd", "vrun", "mdsim", ], } for e in app_map[cfg.app.val]: if (e in cfg): del cfg[e] # Deletes unused plugin settings. desmond_exec = cfg[cfg.app.val] used_plugin = desmond_exec.plugin.list.val used_plugin += [ "list", "maeff_output", ] for e in desmond_exec.plugin: if (e not in used_plugin): del desmond_exec.plugin[e] # Deletes unused ensemble settings ens_list = [ "Ber_NPT", "Ber_NVT", "MTK_NPT", "NH_NVT", "V_NVE", "L_NVT", "L_NPT", "Piston_NPH", ] try: ens_index = ens_list.index(cfg.integrator.type.val) del ens_list[ens_index] except ValueError: pass for e in ens_list: del cfg.integrator[e] # Deletes the "gibbs" setting if it is not used. if ("gibbs" not in cfg.force.term.list.val and "gibbs" in cfg.force.term): del cfg.force.term["gibbs"] def __fix_cfg(msj, cfg): """ """ # Special treatment for annealing if ("annealing" in msj and msj.annealing.val): ens_type = cfg.integrator.type.val ens = cfg.integrator[ens_type] del cfg.integrator[ens_type] cfg.integrator.type.val = "anneal" cfg.integrator["anneal"] = sea.Map("type = " + ens_type) cfg.integrator.anneal[ens_type] = ens # Special treatment for force.nonbonded.far if (cfg.force.nonbonded.far.type.val == "none"): cfg.force.nonbonded["far"] = "none" def __msj2cfg_helper(msj, cfg, model, macro): """ """ sea.set_macro_dict(macro) cfg = sea.Map(DEFAULT_CONFIG) if (cfg is None) else cfg cfg = cfg.dval this_mod = globals() for k in list(msj): if (k not in [ "replica", "backend", ]): this_mod["__set_" + k](msj, cfg, model) return cfg.bval
[docs]def msj2cfg(msj, cfg, model, macro=None, is_gdesmond=False): """ """ macro = macro or {} if (is_gdesmond): import schrodinger.application.desmond.gconfig as gconfig return gconfig.msj2cfg(msj, cfg, model, macro) if 'simulate' in msj: # assumed to be a concatenate config # TODO: eliminate this code -- it should not be needed # with CPU desmond gone (still called from tests though) import schrodinger.application.desmond.gconfig as gconfig simulate_cfgs = [] for msj_ in msj.simulate: simulate_cfg = msj2cfg(msj_, deepcopy(cfg), model, macro) simulate_cfg.restraints = cmj.get_restraints_xor_convert_restrain( msj_) simulate_cfgs.append(simulate_cfg) base_concat_msj = deepcopy(msj) del base_concat_msj['simulate'] base_concat_cfg = msj2cfg(base_concat_msj, deepcopy(cfg), model, macro) return gconfig.concatenate_cfgs(simulate_cfgs, base_concat_cfg) msj_ref, msj_replica = __set_replica(msj, cfg, model) cfg_ref = __msj2cfg_helper(msj_ref, deepcopy(cfg), model, macro) cfg_replica = [] for i, e in enumerate(msj_replica): macro["$REPLICA"] = str(i) cfg_replica.append(__msj2cfg_helper(e, deepcopy(cfg), model, macro)) cfg_ref.app.val = get_exec_name(msj) __optimize_cfg(msj_ref, cfg_ref, model) __set_backend(msj_ref, cfg_ref, model) cfg_ref = cfg_ref.bval cfg_rep = [] for m, c in zip(msj_replica, cfg_replica): c.app.val = get_exec_name(msj) if (m.model_file.val): __optimize_cfg(m, c, cms.Cms(file=m.model_file.val)) else: __optimize_cfg(m, c, model) __set_backend(m, c, model) cfg_rep.append(c.bval) cfg_diff = [] for e in cfg_rep: changed, referred, added, lost = sea.diff(e, cfg_ref) change = changed.update(added) if ("mdsim" in change): del change["mdsim"] if ("minimize" in change): del change["minimize"] if ("vrun" in change): del change["vrun"] if ("remd" in change): remd = change.remd del change["remd"] change.update(remd) cfg_diff.append(change) for e in cfg_diff: cfg_ref.remd.cfg.append(e) __clean_cfg(cfg_ref) __fix_cfg(msj, cfg_ref) __attach_msj(msj, cfg_ref, model) return cfg_ref
[docs]def translate_output_file_names_helper(msj, macro_dict): sea.set_macro_dict(macro_dict) cfg = sea.Map() try: desmond_exec = cfg[get_exec_name(msj)] except KeyError: cfg[get_exec_name(msj)] = sea.Map() desmond_exec = cfg[get_exec_name(msj)] for k in list(msj): if (k == "fep"): cfg.set_value("force.term.gibbs.output.name", msj.fep.output.name.val) elif (k == "trajectory"): if (not (is_minimize(msj) or isinstance(msj.trajectory, sea.Atom))): desmond_exec.set_value("plugin.trajectory.name", msj.trajectory.name.val) elif (k == "eneseq"): if (not isinstance(msj.eneseq, sea.Atom)): desmond_exec.set_value("plugin.eneseq.name", msj.eneseq.name.val) elif (k == "checkpt"): if (not (is_minimize(msj))): if (isinstance(msj.checkpt, sea.Atom)): if (msj.checkpt.val): desmond_exec.set_value("checkpt.name", "$JOBNAME.cpt") else: desmond_exec.set_value("checkpt.name", msj.checkpt.name.val) elif (k == "maeff_output"): if (not isinstance(msj.maeff_output, sea.Atom)): desmond_exec.set_value("plugin.maeff_output.name", msj.maeff_output.name.val) desmond_exec.set_value("plugin.maeff_output.trjdir", msj.trajectory.name.val) elif (k == "simbox"): if (not is_minimize(msj)): if (isinstance(msj.simbox, sea.Atom)): if (msj.simbox.val): desmond_exec.set_value( "plugin.simbox_output.name", "$JOBNAME$[_replica$REPLICA$]_simbox.dat") else: desmond_exec.set_value("plugin.simbox_output.name", msj.simbox.name.val) elif (k == "energy_group"): if (isinstance(msj.energy_group, sea.Atom)): if (msj.energy_group.val): desmond_exec.set_value( "plugin.energy_groups.name", "$JOBNAME$[_replica$REPLICA$]_enegrp.dat") else: desmond_exec.set_value("plugin.energy_groups.name", msj.energy_group.name.val) elif (k == "meta"): if (not (isinstance(msj.meta, sea.Atom) or len(msj.meta.cv) == 0)): #FIXME: uncomment below statements when the backend can reconfigure the file names #DESMOND-2772 #cfg["force.term.ES.name"] = msj.meta.cv_name.val #cfg["force.term.ES.metadynamics_accumulators.name"] = msj.meta.name.val pass elif (k == "vrun_frameset"): cfg.set_value("vrun.frameset.name", msj.vrun_frameset.val) return cfg.bval
[docs]def translate_output_file_names(msj, macro_dict, num_replica=1): """ """ msj_ref = deepcopy(msj) replica_msj = [] if (is_remd(msj)): replica = msj.replica if (isinstance(replica, sea.Map)): replica = sea.List() for i in len(num_replica): replica.append(sea.Map()) if (not isinstance(replica, sea.List)): raise ValueError("Unknown type for 'replica' parameter: %s" % type(replica)) for e in replica: msj_ = deepcopy(msj) msj_.update(e) replica_msj.append(msj_) cfg_ref = translate_output_file_names_helper(msj, macro_dict) cfg_replica = [] for i, e in enumerate(replica_msj): macro_dict["$REPLICA"] = str(i) cfg_replica.append(translate_output_file_names_helper(msj, macro_dict)) __set_backend(msj_ref, cfg_ref, None) cfg_ref = cfg_ref.bval cfg_rep = [] for m, c in zip(replica_msj, cfg_replica): __set_backend(m, c, None) cfg_rep.append(c.bval) cfg_diff = [] for e in cfg_rep: changed, referred, added, lost = sea.diff(e, cfg_ref) change = changed.update(added) if ("mdsim" in change): del change["mdsim"] if ("minimize" in change): del change["minimize"] if ("vrun" in change): del change["vrun"] if ("remd" in change): remd = change.remd del change["remd"] change.update(remd) cfg_diff.append(change) for e in cfg_diff: try: cfg_ref.remd.cfg.append(e) except AttributeError: cfg_ref.remd["cfg"] = sea.List() cfg_ref.remd.cfg.append(e) return cfg_ref
def __canonicalize_fep(msj): """ """ # Canonicalization of "fep.lambda" depends on the model system, so we temporarily don't do it for "fep.lambda". def __canonicalize_temperature(msj): """ """ if (isinstance(msj.temperature, sea.Atom)): msj["temperature"] = sea.List([ [ msj.temperature.val, 0, ], ]) def __canonicalize_pressure(msj): """ """ if (isinstance(msj.pressure, sea.Atom)): msj["pressure"] = sea.List([ msj.pressure.val, "isotropic", ]) def __canonicalize_ensemble(msj): """ """ if (isinstance(msj.ensemble, sea.Atom)): ens_class, method = get_ensemble_class_method(msj) msj["ensemble"] = sea.Map( "class = %s method = %s thermostat.tau = 1.0 barostat.tau = 2.0" % ( ens_class, method, )) def __canonicalize_coulomb_method(msj): """ """ if (isinstance(msj.coulomb_method, sea.Atom) and msj.coulomb_method.val == "pme"): msj["coulomb_method"] = [ "pme", 1E-9, ]
[docs]def canonicalize(msj): """ """ PARAM_TO_CANONICALIZE = [ "fep", "temperature", "pressure", "ensemble", "coulomb_method", ] msj_copy = deepcopy(msj) this_mod = globals() for e in PARAM_TO_CANONICALIZE: if (e in msj): this_mod["__canonicalize_" + e](msj_copy) return msj_copy
if ("__main__" == __name__): # for n_win in range( 26, 26, 2 ) : # print "default:%d" % n_win # __get_fep_schedule( n_win, "binding", "default" )._print() # print "quickcharge:%d" % n_win # __get_fep_schedule( n_win, "binding", "quickcharge" )._print() # print "superquickcharge:%d" % n_win # __get_fep_schedule( n_win, "binding", "superquickcharge" )._print() # print "\n" # for n_win in range( 6, 26, 2 ) : # print "default:%d" % n_win # __get_fep_schedule( n_win, "alchemical", "default" )._print() # print "quickcharge:%d" % n_win # __get_fep_schedule( n_win, "alchemical", "quickcharge" )._print() # print "superquickcharge:%d" % n_win # __get_fep_schedule( n_win, "alchemical", "superquickcharge" )._print() # print "\n" #canonicalize( DEFAULT_SETTING.MD ) #remd = sea.Map( REMD ) #print sea.check_map( remd.DATA, remd.VALIDATE ) #print md.DATA.bval #msj = DEFAULT_SETTING.META #model = cms.Cms( file = "test-out.cms" ) #msj.meta.cv.append( sea.Map( "type = dist atom = [1 2] width = 0.8 wall = 9.0" ) ) #cfg = msj2cfg( msj, None, model, {"$JOBNAME" : "test",} ) import sys msj = sea.Map(open(sys.argv[1], "r").read()) cfg = translate_output_file_names(msj, {"$JOBNAME": "test"}) print(cfg) #model = cms.Cms( file = sys.argv[2] ) #cfg = msj2cfg( msj, None, model, {"$JOBNAME" : "test",}, is_gdesmond = True ) #cfg = msj2cfg( msj, None, model, {"$JOBNAME" : "test",} ) #print cfg #optimize_key( cfg, model ) # remd.DATA.replica[0].temperature.val = 100.0 # remd.DATA.replica[1].temperature.val = 200.0 # print msj2cfg( remd.DATA, None, model, {"$JOBNAME" : "test",} )