Source code for schrodinger.application.desmond.gconfig

"""
Utilities for handling gDesmond config files.

Copyright Schrodinger, LLC. All rights reserved.
"""

# Contributors: Yujie Wu

from copy import deepcopy
from past.utils import old_div
from typing import List

import numpy as np
import scipy.interpolate

import schrodinger.application.desmond.cms as cms
import schrodinger.application.desmond.config as config
import schrodinger.utils.sea as sea
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond.constants import GCMC_BATCH_SIZE_DEFAULT
from schrodinger.application.desmond.constants import SystemType
from schrodinger.structure import StructureReader

DEFAULT_CONFIG = """{
app       = mdsim    # mdsim | remd | vrun | reinit
boot.file = filename

global_cell = {
   reference_time       = 0.0
   topology             = periodic
   n_replica            = 1
   partition            = [1 1 1]
   r_clone              = 5.903366
   clone_policy         = rounded
   margin               = 1.0
}

migration = {
   first    = 0.0
   interval = 0.018
}

spatial_order = auto                  # cells | conpablo
#spatial_order = {
#   style    = foxels
#   indexing = snake                     # snake | linear
#}

integrator = {
   type             = Multigrator
   dt               = 0.2
   respa            = {
      near_timesteps  = 1
      far_timesteps   = 2
      outer_timesteps = 3
      migrate_timesteps = 9
   }

   temperature = {
      T_ref            = 300.0
      use_molecular_KE = false
   }

   pressure = {
      isotropy = isotropic             # isotropic | semi_isotropic | anisotropic | constant_area | flexible | monoclinic
      P_ref    = 1.01325
      tension_ref  = 0
      max_margin_contraction = 0.9
   }

   Multigrator = {
      thermostat = {                   # none or a map
         type      = NoseHoover        # Langevin | NoseHoover | DPD
         timesteps = 12
         Langevin  = {
            tau  = 0.016129            # Gamma = 1/tau = 62 is default -- collision rate of water.
            seed = 2012
         }
         NoseHoover = {
            mts  = 2
            tau  = [0.1 0.1 0.1]
         }
         DPD  = {
           seed = 2012
        }

      }
      barostat = {                     # none or a map
         type      = MTK
         timesteps = 48
         temperature = {
            T_ref            = 300.0
            use_molecular_KE = false
         }
         MTK       = {
            tau        = 0.05
            thermostat = {
               type       = NoseHoover
               NoseHoover = {
                  mts  = 2
                  tau  = [0.1 0.1 0.1]
               }
            }
         }
         Langevin  = {
            tau        = 0.020833333      # Gamma = 1/tau = 48 ps
            thermostat = {
               type = Langevin
               tau  = 0.016129            # Gamma = 1/tau = 62 is default -- collision rate of water.
               seed = 2012
            }
         }
      }
      nve.type = Verlet
   }

   brownie = {
      delta_max  = 0.1
      thermostat = {
         seed = 2012
      }
      barostat   = {                   # none or a map
         delta_max  = "@*.delta_max"
         tau        = 1.0
         thermostat = "@*.thermostat"
      }
   }

   brownie_NVT = {
      thermostat.seed = 2014
      delta_max       = 0.1
   }

   brownie_NPT = {
      thermostat.seed = 2014
      delta_max       = 0.1
      barostat = {
         thermostat.seed = 2014
         tau   = 0.016129
         T_ref = 300.0
      }
   }
   posre_scaling = 1.0
}

force = {
   nonbonded = {
      type   = useries                 # ewald | useries
      sigma  = 2.791856                # only needed when force.type = ewald
      r_cut  = 9.0
      r_lazy = 10.0
      n_zone = 1024                    # only needed when force.type = ewald
      accuracy_level = 0               # only needed when force.type = useries

      near = {
         type  = minimax               # default | force-only | table | alchemical:softcore | binding:softcore
                                       # minimax | alchemical:softcore-minimax | binding:softcore-minimax
                                       # use minimax's with useries
         r_tap = 9.0
         taper = none                  # none | shift | c1switch | c2switch
        #average_dispersion = 69.5     # In kcal/mol A^6. If not set, it will be caculated by Desmond.
         correct_average_dispersion = true
      }

      far = {
         Nterms   = 32
         type     = QuadS              # pme | pme-cpu | binding:pme | QuadS
         sigma_s  = 0.85
         r_spread = 4.0
         n_k      = [16 16 16]
         order    = [4 4 4]
         kappa    = [0.333333 0.333333 0.333333]
         spreading_style = scatter_gather    # scatter_gather | scatter
      }
   }

   bonded  = {
      exclude = []
      include = []
   }

   virtual = {
      exclude = []
      include = []
   }

   ignore_com_dofs = true              # subtract 3 from the degrees of freedom.

   term = {
      list  = []
      gibbs = {
         type      =  none             # none | alchemical | binding
         alpha_vdw =  0.5
         window    =  -1               # window index
#         flexible = {}                # flexible lambdas
         weights   = {
            vdw   = []                 # binding
            es    = []                 # binding
            vdwA  = []                 # alchemical
            vdwB  = []                 # alchemical
            qA    = []                 # alchemical
            qB    = []                 # alchemical
            qC    = []                 # alchemical
            bondA = []                 # alchemical
            bondB = []                 # alchemical
         }
         output = {
            name     = "fep.dE"
            first    = 0.0
            interval = 0.1
         }
      }
   }
   constraint = {
      maxit = 16
      tol   = 1.0e-8
   }
}


PLUGIN_COMMON = {
   list = [status eneseq trajectory remove_com_motion]

   anneal = {
      type     = anneal
      first    = 0.0
      interval = 1.2
      schedule = {
         time  = [0.0 30.0  60.0  90.0  600.0]
         value = [0.0 300.0 600.0 900.0 300.0]
      }
   }

   status = {
      type     = status
      first    = 0.0
      interval = 1.2
   }

   eneseq = {
      type     = eneseq
      name     = "system.ene"
      first    =  0.0
      interval =  1.2
      flush_interval = "@interval"
   }

   gcmc = {
       type = gcmc
       first = 0.0
       name = "$JOBNAME$[_replica$REPLICA$].gcmc"
       eneseq = {
           name = "$JOBNAME$[_replica$REPLICA$]_gcmc.ene"
       }
       interval = 4.8
       nsteps = 5000
       # number of trans/rot moves to ins/del moves
       # .3333 = 1:2 = 1:1:1 ratio (trans:ins:del)
       # a:b = a:b/2:b/2 such that ins/del is fixed
       # move_fraction = 0.0
       mu_excess = -6.180
       solvent_density = 0.03262
       # number of trans/rot moves to perform on an inserted water
       # post_insertion_moves = 0
       grid = {
           track_voids = true
           spacing = .22
           exclusion_radius = 2.2
           region_buffer = 4.0
           global_switching = {
               frequency = 0.2
               move_factor = 3.0
               spacing_factor = 2.0
           }
       }
       verbose = 0
       batch_size = %s
       temperature = 300.0
       # for randomize velocities
       seed = 2007
       restore_engrps = false
   }

   trajectory = {
      type             = trajectory
      format           = dtr
      name             = "out_trj"
      write_velocity   = true
      first            = 0.0
      interval         = 4.8
      center           = []
      glue             = []
      mode             = noclobber     # append | noclobber | clobber
      periodicfix      = true
      write_last_step  = true
      write_first_step = true
      frames_per_file  = 250
      write_last_vel   = false
   }

   maeff_output = {
      # note - this is not a real plugin in GPU desmond. instead, this info is
      # read by the function extract_cms_impl in desmondbackend.py
      type             = maeff_output
      first            = 0.0
      interval         = 120.0
      name             = ""
      write_last_step  = true
      periodicfix      = true
      precision        = 8
      full_system_only = false
      trjdir           = ""
      center_atoms     = ""
   }

   maeff_snapshot = {
      type     = maeff_snapshot
      name     = ""
      first    = 0.0
      interval = 1.2
   }

   randomize_velocities = {
      type        = randomize_velocities
      first       = 0.0
      interval    = inf
      seed        = 2007
      temperature = 300.0
   }

   simbox_output = {
      type     = simbox_output
      name     = ""
      first    = 0.0
      interval = 1.2
   }

   energy_groups = {
      type         = energy_groups
      name         = ""
      first        = 0.0
      interval     = 1.2
      options      = [corr_energy ]        # [pressure_tensor corr_energy self_energy]
      write_report = true
   }

   remove_com_motion = {
      type     = remove_com_motion
      first    = 0.0
      interval = inf
   }

   ptensor = {
      type         = ptensor
      name         = ""
      first        = 0.0
      interval     = 1.2
      print_volume = false
   }

   dipole_moment = {
      type         = dipole_moment
      first        = 0.0
      interval     = 1.2
      name         = ""
   }
}


CHECKPT_COMMON = {
   name             = "checkpt.cpt"
   first            = 0.0
   interval         = 240.06
   wall_interval    = 7200.0
   write_first_step = false
   write_last_step  = true
}


mdsim = {
   title     = "Desmond MD simulation"
   last_time =  1200.0               # In the unit of ps
   checkpt   = "@*.CHECKPT_COMMON"
   plugin    = "@*.PLUGIN_COMMON"
}

reinit = {
   title     = "Desmond reinit/MD simulation"
   last_time = inf
   checkpt   = "@*.CHECKPT_COMMON"
   plugin    = "@*.PLUGIN_COMMON"
   frameset = {
       first    = 0.0
       interval = 1200.0
       configurations_file = ""
   }
}

remd = {
   title     = "Desmond REMD simulation"
   last_time = 1200.0
   first     = 1.2
   interval  = 1.2
   seed      = 2012                  # random number seed
   checkpt   = "@*.CHECKPT_COMMON"
   plugin    = "@*.PLUGIN_COMMON"
   graph     = {
      edges = {type = linear nodes = []}
   }
   deltaE        = {
      first    = "@*.first"
      interval = "@*.interval"
      name     = "deltaE.txt"
   }
   exchange_dat = {
      name     = "exchange.dat"
   }
}

vrun = {
   title    = "Desmond Vrun"
   input    = frameset               # bootfile | frameset (|stdin to be a feature added later)
   checkpt  = "@*.CHECKPT_COMMON"
   plugin   = "@*.PLUGIN_COMMON"
   last_time= inf
   frameset = {
      first     = 0.0
      interval  = 0.0
      name      = ""
      last_time = "@*.last_time"
   }
}

gui.ewald_tol = 1E-10

}""" % (GCMC_BATCH_SIZE_DEFAULT)

get_default_setting = config.get_default_setting
parse_lambda = config.parse_lambda
get_exec_name = config.get_exec_name
get_exec = config.get_exec
is_concat = config.is_concat
is_minimize = config.is_minimize
is_meta = config.is_meta
__set_meta_file = config.__set_meta_file
__set_meta = config.__set_meta
__set_reinit_frameset = config.__set_reinit_frameset
is_vrun = config.is_vrun
is_reinit = config.is_reinit
is_remd = config.is_remd
has_plugin = config.has_plugin
remove_plugin = config.remove_plugin
get_homebox = config.get_homebox
is_triclinic_box = config.is_triclinic_box


[docs]def add_plugin(desmond_exec, plugin_name, position=None): """ Wrap config.add_plugin, making sure to place added plugins before GCMC """ # by default we always keep GCMC plugin at the end because it modifies the # system and can produce seemingly inconsistent outputs between different # reporting plugins if it executes between them. This is a sensible default, # not a requirement. if position is None and 'gcmc' in desmond_exec.plugin.list: position = desmond_exec.plugin.list.index('gcmc') config.add_plugin(desmond_exec, plugin_name, position)
[docs]def real_to_int(val): """ round up to the closest integer when the difference is less than 1e-7 """ return int(val + 1.0e-7)
[docs]def optimize_key(msj, cfg, box: List[float]): """ Optimizes the simulation parameters in 'cfg', where 'cfg' must represent a complete config file. :param box: Simulation box dimensions. """ import math r_cut = cfg.force.nonbonded.r_cut.val cpu = cfg.global_cell.partition.val dt = cfg.integrator.dt.val far_ts = cfg.integrator.respa.far_timesteps.val out_dt = dt * cfg.integrator.respa.outer_timesteps.val far_dt = dt * far_ts epsilon = cfg.gui.ewald_tol.val # Below is the idea how to optimize the margin. -YW # 1. The "margin" parameter's value can be optimized as k * rmsm + C, where k is a prefactor, rmsm is the root mean square # motion, C is the maximum distance that a particle can travel without colliding to another one. Below, let's first # consider the root mean square motion, and then k and C. # 2. The root-mean-square motion (rmsm) can be calculated from the diffusion constant using the equation: # rmsm = sqrt( <x^2> - <x>^2 ) = sqrt( 6 * D * dt ) # where D is diffusion constant, and dt is the time interval. # 3. How to get dt and D? # - dt is easy. We just get its value from `cfg' since it is a settable parameter. # - D is a little bit troublesome. First of all, different molecules have different diffusion constants. In principle, # We should use the largest D, but unfortunately we don't rigorously know what the largest D is. Nevertheless, for # aqueous systems, it is probably safe to assume water is the most diffusive molecule. So we use water's D. # - A more difficult problem is that D is temperature-dependent. For water molecules, D = 0.0187 (T = 242.5 K), # 0.105 (T = 273.5 K), 0.223 (T = 298.0 K), 0.3575 (T = 318.0 K). The unit of D here is A^2/ps. So we need to somehow # predict D based on the temperature. For this, we can use the Frenkel-Eyring theory, which basically says # D = A * exp( -E / RT ) # where A and E presumably are constants (this assumption is an oversimplification, but should be good enough for our # purpose here). By fitting to the experimental data, we get A = 807.015 A^2/ps and E = 4.87 kcal/mol. # 4. For very high temperature, D is overestimated a lot by the Frenkel-Eyring theory. So we use the following empirical # formula to correct the result: D = D - 0.03 * (T - 350), for T > 350. # 5. How do we choose value for the prefactor k? # - First of all, commonly used water models such as TIP3P and SPC tend to overestimate water diffusion constant. So to # be safe, we should assume the D of water model is at least 2 times the experimental value. # - Second, remember that mean square displacement formula for D is correct only for time -> inf; for time -> 0, the # value would be much larger, presumedly ~100% larger. # - The above thoughts lead to a working value for diffusion constant = 4 D, where D is the experimental value. So this # gives k = sqrt( 4 ) = 2. # 6. How about C? # - For the value of C, we can look at the radial distribution function. The width of the first peak is the maximum # radial space that first shell of water molecules can occupy. Because for any direction only one molecule can occupy # that radial space, we can safely take the width as the value of C. For liquid water, the value is ~0.4 Angstrom. # Gets the highest temperature. desmond_exec = cfg[cfg.app.val] if (has_plugin(desmond_exec, "anneal")): temp = max(desmond_exec.plugin.anneal.schedule.value.val) else: if ("T_groups" in cfg.integrator.temperature): # FIXME: This won't work for gDesmond temp_list = [] for e in cfg.integrator.temperature.T_groups: temp_list.append(e.T_ref.val) temp = max(temp_list) else: temp = cfg.integrator.temperature.T_ref.val box_x, box_y, box_z = cms.get_boxsize(box) homebox = get_homebox(box, cpu) diffusion = 807.015 * math.exp(-4.87 / 1.986E-3 / temp) - 0.03 * max( 0, temp - 350) migration_interval = cfg.migration.interval.val # Adjusts the migration interval to be `N x out_dt', where `N' is an integer number >= 1. N = real_to_int(old_div(migration_interval, out_dt)) N = 1 if (1 > N) else N cfg.integrator.respa.migrate_timesteps.val = N * cfg.integrator.respa.outer_timesteps.val migration_interval = N * out_dt rmsm = math.sqrt(6 * diffusion * migration_interval) margin = rmsm * 2.0 + 0.8 # We don't want to transfer a small amount of data between nodes very frequently. This is because of the transfer latency # overhead, which might exceed the actual data-transferring time. On the other hand we don't want a big margin either, # because a lot of particle pairs would have distances larger than the cutoff. This thought leads to a further checking: # If the margin is still ``big'' and the migration interval can be decreased, we will reduce the margin further more. # How ``big'' is ``big''? # - An arbitrary yet reasonable choice is this: If the margin causes > 50% extra computation, we consider it as big. # - pow( 1.5, 1.0/3.0 ) = 1.1447142425533319 while (margin > r_cut * 0.1447142425533319 and migration_interval >= far_dt * 1.9): migration_interval -= far_dt # Adjusts the migration interval to be `N x out_dt', where `N' is an integer number >= 1. N = real_to_int(old_div(migration_interval, out_dt)) N = 1 if (1 > N) else N cfg.integrator.respa.migrate_timesteps.val = N * cfg.integrator.respa.outer_timesteps.val migration_interval = N * out_dt rmsm = math.sqrt(6 * diffusion * migration_interval) margin = rmsm * 2.0 + 0.8 r_clone = (r_cut + margin) * 0.5 cfg.global_cell.margin.val = margin cfg.migration.interval.val = migration_interval r_cut4 = r_cut * 4 if (is_triclinic_box(box) and (box_x < r_cut4 or box_y < r_cut4 or box_y < r_cut4)): cfg.global_cell.clone_policy.val = "unrounded" else: cfg.global_cell.clone_policy.val = "rounded" if (cfg.force.nonbonded.far.type.val != "none"): toler = math.sqrt(abs(math.log(epsilon * r_cut))) sigma = old_div(math.sqrt(abs(math.log(epsilon * r_cut * toler))), r_cut) toler = math.sqrt(-math.log(epsilon * r_cut * (2.0 * toler * sigma)**2)) fac = sigma * toler / 3.1415926 n_k = [ 0.25 + fac * box_x, 0.25 + fac * box_y, 0.25 + fac * box_z, ] for i, n in enumerate(n_k): p = cpu[i] n = int(math.ceil(n)) # Each grid dimension must satisfy the following conditions: # 1. if p is given, it must divide n # 2. n must have no integer factors greater than 2, 3, or 5 while (True): # check that n is commensurate with p if (p): if (n % p): n += 1 continue N = n for radix in ( 2, 3, 5, ): while (not N % radix): N /= radix if (N == 1): break n += 1 continue n_k[i] = n cfg.force.nonbonded.far.n_k.val = n_k cfg.force.nonbonded.sigma.val = old_div(1, sigma) pme_order = cfg.force.nonbonded.far.order.val margin_ = old_div(margin, 1.414) if (cfg.force.nonbonded.far.type.val[:3] == "gse"): r_clone = max(r_clone, cfg.force.nonbonded.far.r_spread.val + margin_) r_lazy = r_clone * 2.0 elif (cfg.force.nonbonded.far.type.val[:3] == "pme"): a = cpu[0] * (pme_order[0] - 1) * 0.5 / n_k[0] b = cpu[1] * (pme_order[1] - 1) * 0.5 / n_k[1] c = cpu[2] * (pme_order[2] - 1) * 0.5 / n_k[2] tmpx = homebox[0] * a + homebox[1] * b + homebox[2] * c tmpy = homebox[3] * a + homebox[4] * b + homebox[5] * c tmpz = homebox[6] * a + homebox[7] * b + homebox[8] * c r_pme = math.sqrt(tmpx * tmpx + tmpy * tmpy + tmpz * tmpz) + margin_ r_clone = max(r_clone, r_pme + 0.01) mmc = cfg.integrator.pressure.max_margin_contraction.val try: if (msj.bigger_rclone.val): r_clone /= mmc except AttributeError: pass r_lazy = r_clone * 2.0 if (r_lazy > old_div(box_x, cpu[0]) and r_lazy > old_div(box_y, cpu[1]) and r_lazy > old_div(box_z, cpu[2])): print("WARNING: Cutoff radius is too large relative to box size.\n" \ " Try either increasing box size or reducing cutoff radius.") cfg.global_cell.r_clone.val = r_clone + 1E-6 cfg.force.nonbonded.r_lazy.val = cfg.global_cell.r_clone.val * 2
default_flex_vdw = r""" { vdwA = "@*.*.weights.vdwA" vdwB = "@*.*.weights.vdwB" qA = "@*.*.weights.qA" qB = "@*.*.weights.qB" name = "%s"} """ default_flex_bond = r""" { A = "@*.*.weights.bondA" B = "@*.*.weights.bondB" name = "%s" } """ default_flex_dihe = r""" { A = "@*.*.flexible[0].A" B = "@*.*.flexible[0].B" name = "%s"} """ def __get_epsilon_lambda_schedule(n_win, B=False): """ generate lambda schedule for epsilons in soft-core potentials n_win: number of windows B: final state """ tmp_list = [0.1] * n_win if B: tmp_list[0] = 0.0 else: tmp_list[-1] = 0.0 return sea.List(tmp_list)
[docs]class LambdaSchedule: """Base class for lambda schedules.""" lambdas: List[float] = []
[docs] @classmethod def schedule(cls, n_win: int) -> List[float]: """Return schedule with the desired number of windows. This method constructs a piecewise linear spline interpolator of the prototype lambda schedule and evaluates it on the specified number of windows to obtain the new lambda schedule. :param n_win: Number of lambda windows to use. :return: New lambda schedule. """ lambdas = cls.lambdas values = np.linspace(0.0, 1.0, len(lambdas)) interp = scipy.interpolate.interp1d(values, lambdas, kind='slinear') new_values = np.linspace(0.0, 1.0, n_win) return interp(new_values).tolist()
[docs]class ScheduleAngleCorePhysical(LambdaSchedule): """Schedule for angle_core_physical terms.""" lambdas = [ 1.0, 0.8, 0.6, 0.35, 0.2, 0.1, 0.03, 0.01, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 ]
[docs]class ScheduleAngleCoreDummy(LambdaSchedule): """Schedule for angle_core_dummy terms.""" lambdas = [ 1.0, 0.8, 0.65, 0.5, 0.4, 0.3, 0.2, 0.16, 0.12, 0.08, 0.04, 0.01, 0.0, 0.0, 0.0, 0.0 ]
[docs]class ScheduleBondCorePhysical(LambdaSchedule): """Schedule for bond_core_physical terms.""" lambdas = [ 1.0, 0.3, 0.1, 0.05, 0.03, 0.01, 0.002, 0.001, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 ]
[docs]class ScheduleBondCoreDummy(LambdaSchedule): """Schedule for bond_core_dummy terms.""" lambdas = [ 1.0, 0.6, 0.35, 0.22, 0.18, 0.14, 0.12, 0.11, 0.09, 0.08, 0.06, 0.02, 0.005, 0.0, 0.0, 0.0 ]
[docs]class ScheduleAngleAttachment(LambdaSchedule): """Schedule for angle_attachment terms.""" lambdas = [ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.95, 0.9, 0.85, 0.8, 0.75, 0.6, 0.35, 0.1, 0.0 ]
[docs]class ScheduleLinear(LambdaSchedule): """Linear lambda schedule.""" lambdas = [1.0, 0.0]
def __get_flexible_lambda_schedule(n_win): """ Return sea.List object for flexible lambda schedules """ def sched2str(schedule: List[float]) -> str: s = [str(sched) for sched in schedule] return ' '.join(s), ' '.join(reversed(s)) angle_core_physical = ScheduleAngleCorePhysical.schedule(n_win) angle_core_physicalA, angle_core_physicalB = sched2str(angle_core_physical) angle_core_dummy = ScheduleAngleCoreDummy.schedule(n_win) angle_core_dummyA, angle_core_dummyB = sched2str(angle_core_dummy) bond_core_physical = ScheduleBondCorePhysical.schedule(n_win) bond_core_physicalA, bond_core_physicalB = sched2str(bond_core_physical) bond_core_dummy = ScheduleBondCoreDummy.schedule(n_win) bond_core_dummyA, bond_core_dummyB = sched2str(bond_core_dummy) angle_attachment = ScheduleAngleAttachment.schedule(n_win) angle_attachmentA, angle_attachmentB = sched2str(angle_attachment) alchemical_restraint = ScheduleLinear.schedule(n_win) alchemical_restraintA, alchemical_restraintB = sched2str( alchemical_restraint) flex_lambda_list = [ f""" {{ A = [ {angle_core_physicalA}] B = [ {angle_core_physicalB}] name = "angle_core_physical" }} """, f""" {{ A = [ {angle_core_dummyA}] B = [ {angle_core_dummyB}] name = "angle_core_dummy" }} """, f""" {{ A = [ {bond_core_physicalA}] B = [ {bond_core_physicalB}] name = "bond_core_physical" }} """, f""" {{ A = [ {bond_core_dummyA}] B = [ {bond_core_dummyB}] name = "bond_core_dummy" }} """, f""" {{ A = [ {angle_attachmentA}] B = [ {angle_attachmentB}] name = "angle_attachment" }} """, f""" {{ A = [ {alchemical_restraintA}] B = [ {alchemical_restraintB}] name = "alchemical_restraint" }} """ ] flex_lambda_list += [ default_flex_dihe % x for x in ["dihedral_core_physical", "dihedral_core_dummy"] ] flex_lambda_list += [ default_flex_bond % x for x in ["dihedral_attachment", "dihedral_restraint"] ] flex_lambda_list += [ default_flex_vdw % x for x in [ "excluded2pair", "pair2nonbonded", "excluded2nonbonded", "nonbonded2pair", "pair2excluded", "nonbonded2excluded", "unmatched" ] ] flex_sea = sea.List(list(map(sea.Map, flex_lambda_list))) for sched in flex_sea: if sched.name.val in [ "angle_core_physical", "angle_core_dummy", "dihedral_core_physical", "dihedral_core_dummy", "dihedral_attachment", "angle_attachment", "dihedral_restraint" ]: sched.EpsA = __get_epsilon_lambda_schedule(n_win) sched.EpsB = __get_epsilon_lambda_schedule(n_win, B=True) return flex_sea def __set_fep(msj, cfg, model): """ """ if (model is None): return sys_type = cms.get_model_system_type(model.comp_ct) fep_type = "alchemical" if (sys_type == SystemType.ALCHEMICAL) else "binding" cfg.force.nonbonded.near["type"] = fep_type + ":softcore" far_type = cfg.force.nonbonded.far.type.val if (fep_type == "binding" and far_type in ["pme", "gse", "QuadS"]): cfg.force.nonbonded.far.type.val = "binding:" + far_type if (cfg.force.nonbonded.type.val == "useries"): cfg.force.nonbonded.near.type.val += "-minimax" if (sys_type == SystemType.OTHER): return desmond_exec = get_exec(msj, cfg) if ("gibbs" not in desmond_exec.plugin.list.val): cfg.force.term.list.append("gibbs") gibbs = cfg.force.term.gibbs gibbs.type.val = fep_type gibbs.output.name.val = msj.fep.output.name.val gibbs.output.first.val = msj.fep.output.first.val gibbs.output.interval.val = msj.fep.output.interval.val if (msj.fep.i_window.val is not None): gibbs.window.val = msj.fep.i_window.val if (isinstance(msj.fep.lambda_, sea.Atom)): scheme, n_win = parse_lambda(msj.fep.lambda_.val) if (n_win < 4): raise ValueError("Total number of windows should be 4 at least") fs = config.get_fep_lambdas(n_win, fep_type, scheme) la = gibbs.weights la["vdw"] = fs.vdw la["es"] = fs.coulomb la["vdwA"] = fs.vdwA la["vdwB"] = fs.vdwB la["qA"] = fs.chargeA la["qB"] = fs.chargeB la["qC"] = [1 - eA - eB for eA, eB in zip(fs.chargeA, fs.chargeB)] la["bondA"] = fs.bondedA la["bondB"] = fs.bondedB la["N"] = n_win if scheme == "flexible": gibbs.flexible = __get_flexible_lambda_schedule(n_win) else: if (fep_type != config.lambda_type(msj.fep.lambda_)): raise ValueError("Wrong lambda schedule type") if (fep_type == "alchemical"): fs = msj.fep.lambda_ la = gibbs.weights la["vdwA"] = fs.vdwA la["vdwB"] = fs.vdwB la["qA"] = fs.chargeA la["qB"] = fs.chargeB la["qC"] = [ 1 - eA.val - eB.val for eA, eB in zip(fs.chargeA, fs.chargeB) ] la["bondA"] = fs.bondedA la["bondB"] = fs.bondedB la["N"] = len(fs.vdwA) else: fs = msj.fep.lambda_ la = gibbs.weights la["vdw"] = fs.vdw la["es"] = fs.coulomb la["N"] = len(fs.vdw) def __set_cutoff_radius(msj, cfg, model): """ """ cfg.force.nonbonded.r_cut.val = msj.cutoff_radius.val def __set_bigger_rclone(msj, cfg, model): """ """ def __set_taper(msj, cfg, model): """ """ default_taper_setting = sea.Map("method = shift width = 1.0") if (isinstance(msj.taper, sea.Atom)): if (msj.taper.val): cfg.force.nonbonded.near.taper.val = default_taper_setting.method.val cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val - default_taper_setting.width.val else: cfg.force.nonbonded.near.taper.val = "none" cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val else: custom_taper_setting = msj.taper msj.taper = default_taper_setting msj.taper.update(custom_taper_setting) cfg.force.nonbonded.near.taper.val = msj.taper.method.val cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val - msj.taper.width.val if (cfg.force.nonbonded.near.r_tap.val < 0): raise ValueError("Invalid value for 'r_tap': %f" % cfg.force.nonbonded.near.r_tap.val) def __set_coulomb_method(msj, cfg, model): """ """ if (isinstance(msj.coulomb_method, sea.Atom)): if (msj.coulomb_method.val == "pme"): cfg.force.nonbonded.type.val = "ewald" cfg.force.nonbonded.near.type.val = "default" cfg.force.nonbonded.far.type.val = "pme" elif (msj.coulomb_method.val == "useries"): if model and is_triclinic_box(model.box): # Automatically resets the method to "pme" if the system uses a # triclinic box because the useries method currently doesn't # work with triclinic box. DESMOND-6842. print( "NOTE: The simulation box is triclinic. Changed coulomb_method from 'useries'\n" " to 'pme'.") msj.coulomb_method.val = "pme" return __set_coulomb_method(msj, cfg, model) cfg.force.nonbonded.type.val = "useries" cfg.force.nonbonded.near.type.val = "minimax" cfg.force.nonbonded.far.type.val = "QuadS" else: cfg.force.nonbonded.type.val = "vacuum" cfg.force.nonbonded.far.type.val = "none" cfg.force.nonbonded.sigma.val = "inf" else: method_name, ewald_tol = msj.coulomb_method.val if (method_name == "pme"): cfg.force.nonbonded.type.val = "ewald" cfg.force.nonbonded.near.type.val = "default" cfg.force.nonbonded.far.type.val = "pme" cfg.gui.ewald_tol.val = ewald_tol else: raise ValueError("Unknown coulomb method: %s" % method_name) def __set_temperature(msj, cfg, model): """ """ if ("annealing" in msj and msj.annealing.val): cfg.mdsim.plugin.anneal.schedule["time"] = sea.List() cfg.mdsim.plugin.anneal.schedule["value"] = sea.List() time = cfg.mdsim.plugin.anneal.schedule.time value = cfg.mdsim.plugin.anneal.schedule.value for e in msj.temperature: value.append(e[0].val) time.append(e[1].val) else: if (isinstance(msj.temperature, sea.Atom)): cfg.integrator.temperature.T_ref.val = msj.temperature.val else: # This part probably does not work. But we don't know the correct syntax. temperature = sea.Map() if (1 < len(msj.temperature)): t_list = sea.List() for e in msj.temperature: temp = sea.Map() temp["T_ref"] = e[0].val temp["groups"] = sea.List([ e[1].val, ]) t_list.append(temp) temperature["T_groups"] = t_list temperature["T_ref"] = msj.temperature[0][0].val cfg.integrator["temperature"] = temperature # Low temperature of the barostat can result in unstable simulations. # We set the minimum temperature of the barostat to 100 K. # For detail, refer to DESMOND-10477 and DESMOND-10543. barostat = cfg.integrator.Multigrator.barostat if isinstance(barostat, sea.Map): barostat.temperature.T_ref.val = max( 100.0, cfg.integrator.temperature.T_ref.val) def __set_annealing(msj, cfg, model): """ """ if (msj.annealing.val): add_plugin(cfg.mdsim, "anneal") else: remove_plugin(cfg.mdsim, "anneal") def __set_pressure(msj, cfg, model): """ """ if (isinstance(msj.pressure, sea.Atom)): cfg.integrator.pressure.P_ref.val = msj.pressure.val else: cfg.integrator.pressure.P_ref.val = msj.pressure[0].val if (isinstance(msj.ensemble, sea.Atom)): if (msj.ensemble.val in [ "NPT", "NPT_L", ]): cfg.integrator.pressure.isotropy.val = msj.pressure[1].val else: if (msj.ensemble.class_.val == "NPT"): cfg.integrator.pressure.isotropy.val = msj.pressure[1].val def __set_surface_tension(msj, cfg, model): """ """ if ((isinstance(msj.ensemble, sea.Atom) and msj.ensemble.val == "NPgT") or (isinstance(msj.ensemble, sea.Map) and msj.ensemble.class_.val == "NPgT")): cfg.integrator.pressure.tension_ref.val = msj.surface_tension.val else: return if ("tension_ref" in cfg.integrator.pressure): del cfg.integrator.pressure["tension_ref"]
[docs]def get_ensemble_class_method(msj): """ """ if (isinstance(msj.ensemble, sea.Atom)): ens_type_map = { "NPT": ( "NPT", "MTK", ), "NVT": ( "NVT", "NH", ), "NVE": ( "NVE", "NH", ), # Method for "NVE" will be ignored. "NPgT": ( "NPgT", "MTK", ), "NPAT": ( "NPAT", "MTK", ), "NPT_L": ( "NPT", "Langevin", ), "NVT_L": ( "NVT", "Langevin", ), "NPT_DPD": ( "NPT", "DPD", ), "NVT_DPD": ( "NVT", "DPD", ), } return ens_type_map[msj.ensemble.val] else: return ( msj.ensemble.class_.val, msj.ensemble.method.val, )
def __set_ensemble(msj, cfg, model): """ """ integrator = cfg.integrator ens = "NVE" tau = None if (isinstance(msj.ensemble, sea.Atom)): ens_type_map = { "NPT": "MTK_NPT", "NVT": "NH_NVT", "NVE": "V_NVE", "NPgT": "MTK_NPT", "NPAT": "MTK_NPT", "NPT_L": "L_NPT", "NVT_L": "L_NVT", "NPT_Brownie": "brownie_NPT", "NVT_Brownie": "brownie_NVT", "NPT_DPD": "DPD_NPT", # FIXME: should likely be DPD_NVT "NVT_DPD": "DPD", } ens = key = msj.ensemble.val else: if (msj.ensemble.class_.val == "NVE"): ens_type_map = { "NVE": "V_NVE", } key = msj.ensemble.class_.val else: # FIXME: Check the consistency between the "class" and "method" parameters when job is launched. ens_type_map = { ( "NPT", "MTK", ): "MTK_NPT", ( "NPgT", "MTK", ): "MTK_NPT", ( "NPAT", "MTK", ): "MTK_NPT", ( "NVT", "NH", ): "NH_NVT", ( "NPT", "Langevin", ): "L_NPT", ( "NPgT", "Langevin", ): "L_NPT", ( "NPAT", "Langevin", ): "L_NPT", ( "NVT", "Langevin", ): "L_NVT", ( "NPT", "DPD", ): "DPD_NPT", ( "NVT", "DPD", # FIXME: should be DPD_NVT ): "DPD", ( "NPT", "Brownie", ): "brownie_NPT", ( "NVT", "Brownie", ): "brownie_NVT", } ens = msj.ensemble.class_.val method = msj.ensemble.method.val ttau = msj.ensemble.thermostat.tau.val if ( "thermostat" in msj.ensemble and method not in ["Brownie", "DPD"]) else None btau = msj.ensemble.barostat.tau.val if ("barostat" in msj.ensemble) else None key = ( ens, method, ) tau = ( ttau, btau, ) try: t = ens_type_map[key] except KeyError: raise ValueError(f"Unsupported ensemble class or method {key}") if (ens == "NPgT"): integrator.pressure.isotropy.val = "semi_isotropic" elif (ens == "NPAT"): integrator.pressure.isotropy.val = "constant_area" if t in ["V_NVE"]: integrator.Multigrator["thermostat"] = "none" if t in ["NH_NVT", "L_NVT", "V_NVE", "NVT_DPD", "DPD"]: integrator.Multigrator["barostat"] = "none" if (tau is not None): # tau[0] is for thermostat, tau[1] for barostat. if t in ["MTK_NPT", "NH_NVT"]: if (tau[0]): ts = integrator.Multigrator.thermostat.timesteps.val for e in integrator.Multigrator.thermostat.NoseHoover.tau: e.val = old_div(tau[0], float(ts)) if (t == "MTK_NPT"): ts = integrator.Multigrator.barostat.timesteps.val if (tau[0]): for e in integrator.Multigrator.barostat.MTK.thermostat.NoseHoover.tau: e.val = old_div(tau[0], float(ts)) if (tau[1]): integrator.Multigrator.barostat.MTK.tau.val = old_div( tau[1], float(ts)) elif (t == "L_NVT"): integrator.Multigrator.thermostat.type.val = "Langevin" if (tau[0] is not None): ts = integrator.Multigrator.thermostat.timesteps.val integrator.Multigrator.thermostat.Langevin.tau.val = old_div( tau[0], float(ts)) elif t == "DPD": integrator.Multigrator.thermostat.type.val = "DPD" elif (t == "L_NPT"): integrator.Multigrator.thermostat.type.val = "Langevin" integrator.Multigrator.barostat.type.val = "Langevin" if (tau[0] is not None): ts = integrator.Multigrator.thermostat.timesteps.val integrator.Multigrator.thermostat.Langevin.tau.val = old_div( tau[0], float(ts)) if (tau[1] is not None): ts = integrator.Multigrator.barostat.timesteps.val integrator.Multigrator.barostat.Langevin.thermostat.tau.val = old_div( tau[0], float(ts)) integrator.Multigrator.barostat.Langevin.tau.val = old_div( tau[1], float(ts)) elif (t == "brownie_NVT"): integrator.brownie_NVT.delta_max.val = msj.ensemble.brownie.delta_max.val elif (t == "brownie_NPT"): if (tau[1] is not None): integrator.brownie_NPT.barostat.tau.val = tau[1] integrator.brownie_NPT.delta_max.val = msj.ensemble.brownie.delta_max.val if (isinstance(msj.temperature, sea.Atom)): integrator.brownie_NPT.barostat.T_ref.val = msj.temperature.val else: integrator.brownie_NPT.barostat.T_ref.val = msj.temperature[0][ 0].val if (t == "brownie_NVT"): integrator.type.val = "brownie_NVT" elif (t == "brownie_NPT"): integrator.type.val = "brownie_NPT" elif t in ["DPD", "DPD_NPT"]: set_dpd_params(msj, cfg, ensemble=t)
[docs]def set_dpd_params(msj, cfg, ensemble): """ Set parameters for DPD simulation :param str ensemble: name of the ensemble """ # Add special keywords from MATSCI-8136 and DESMOND-9886 cfg.force.nonbonded.far.type.val = "none" cfg.force.nonbonded.near['average_dispersion'] = 0.0 cfg.force.nonbonded.near.correct_average_dispersion.val = "false" cfg.force.nonbonded.near.type.val = 'polynomial' cfg.force.nonbonded.near['seed'] = 2020 cfg.force.nonbonded.near['acceleration'] = 'dpd' cfg.force.nonbonded.near['dt'] = msj.timestep.val[0] cfg.force.nonbonded.near['T_ref'] = get_tempearture(msj) cfg.integrator.Multigrator.nve.type.val = 'Verlet' cfg.integrator.Multigrator['thermostat'] = 'none' if ensemble == "DPD": cfg.integrator.Multigrator.barostat.val = 'none' elif ensemble == "DPD_NPT": cfg.integrator.Multigrator.barostat.type.val = "MTK"
[docs]def get_tempearture(msj): """ get temperature from msj :rtype: float :return: temperature """ return (msj.temperature.val if isinstance(msj.temperature, sea.Atom) else msj.temperature[0][0].val)
def __set_time(msj, cfg, model): """ """ desmond_exec = get_exec(msj, cfg) desmond_exec.last_time.val = msj.time.val def __set_elapsed_time(msj, cfg, model): """ """ cfg.global_cell.reference_time.val = msj.elapsed_time.val def __set_timestep(msj, cfg, model): """ """ timestep = msj.timestep.val cfg.integrator.dt.val = timestep[0] cfg.integrator.respa.near_timesteps.val = int( old_div(timestep[1], timestep[0]) + 0.1) cfg.integrator.respa.far_timesteps.val = int( old_div(timestep[2], timestep[0]) + 0.1) cfg.integrator.respa.outer_timesteps.val = int( old_div(timestep[2], timestep[0]) + 0.1) def __set_cpu(msj, cfg, model): """ """ if (model is None): return if (isinstance(msj.cpu, sea.Atom)): import schrodinger.application.desmond.autopartition as autopartition x, y, z = cms.get_boxsize(model.box) nx, ny, nz = autopartition.auto_partition([ x, y, z, ], msj.cpu.val) else: nx, ny, nz = msj.cpu.val cfg.global_cell["partition"] = ( nx, ny, nz, ) def __set_glue(msj, cfg, model): """ """ def __set_vrun_frameset(msj, cfg, model): """ """ cfg.vrun.frameset.name.val = msj.vrun_frameset.val def __set_gcmc(msj, cfg, model): """ """ desmond_exec = get_exec(msj, cfg) gcmc = desmond_exec.plugin.gcmc mgcmc = msj.gcmc # set up plugin interval gcmc.first.val = mgcmc.first.val gcmc.interval.val = mgcmc.interval.val # set up eneseq gcmc.eneseq.name.val = mgcmc.ene_name.val # setup moves moves = mgcmc.moves gcmc.nsteps.val = moves.moves_per_cycle.val # disabled until functionality restored # move_ratio_str = moves.trans_to_ins_del_ratio.val # num_trans, num_ins_del = map(int, move_ratio_str.split(':')) # gcmc.move_fraction.val = float(num_trans) / (num_trans + num_ins_del) # gcmc.post_insertion_moves.val = \ # moves.post_insertion_translation_moves.val # simulation values gcmc.mu_excess.val = mgcmc.mu_excess.val gcmc.solvent_density.val = mgcmc.solvent_density.val # setup grid gcmc.grid.spacing.val = mgcmc.gcmc_region.cell_size.val gcmc.grid.exclusion_radius.val = mgcmc.gcmc_region.exclusion_radius.val gcmc.grid.track_voids.val = mgcmc.gcmc_region.track_voids.val gcmc.grid.region_buffer.val = mgcmc.gcmc_region.region_buffer.val # check if deprecated spec is passed if 'whole_box_frequency' in mgcmc.gcmc_region: if 'global_switching' in mgcmc.gcmc_region: raise ValueError( "gcmc.gcmc_region: Only one of global_switching " "and deprecated whole_box_frequency can be defined") else: global_switching = sea.Map() global_switching.frequency = deepcopy( mgcmc.gcmc_region.whole_box_frequency) else: global_switching = deepcopy(mgcmc.gcmc_region.global_switching) # names should be the same - explicitly loop over keys instead of copying # entire map to ensure keys are equivalent for k in global_switching.keys(): gcmc.grid.global_switching[k].val = global_switching[k].val gcmc.verbose.val = int(mgcmc.verbose.val) # setup random seed seed = mgcmc.seed.val if seed == 'random': # in case someone else has seeded the generator state = np.random.get_state() np.random.seed() seed = np.random.randint(np.iinfo(np.int32).max) # being a good neighbor np.random.set_state(state) gcmc.seed.val = seed # set temperature from global msj value ref_temp = msj.temperature.val if (isinstance( msj.temperature, sea.Atom)) else msj.temperature[0][0].val gcmc.temperature.val = ref_temp add_plugin(desmond_exec, 'gcmc') # if we use the GCMC plugin, we should tell energy_groups to output gcmc # info. we can do this whether or not energy_groups is actually enabled (ie # added to plugin.list) if 'energy_groups' in desmond_exec.plugin: desmond_exec.plugin.energy_groups.options.append('gcmc_info') def _skip_replica_trajectory(msj: sea.Map) -> bool: """ Return True if the trajectory should not be written for this replica based on the `fep.trajetory.record_windows` setting. """ try: fep_traj = msj.fep.trajectory.record_windows.val i_window = msj.fep.i_window.val except AttributeError: pass else: if i_window is not None: if isinstance(msj.fep.lambda_, sea.Atom): _, n_win = parse_lambda(msj.fep.lambda_.val) else: fs = msj.fep.lambda_ n_win = len( fs.vdwA) if config.lambda_type(fs) == "alchemical" else len( fs.vdw) if isinstance(fep_traj, str): if fep_traj == "all": fep_traj = list(range(n_win)) else: fep_traj = [x if x >= 0 else n_win + x for x in fep_traj] if i_window not in fep_traj: return True return False def __set_trajectory(msj, cfg, model): """ """ from schrodinger.application.desmond.packages import restraint if (is_minimize(msj)): return desmond_exec = get_exec(msj, cfg) if (isinstance(msj.trajectory, sea.Atom)): remove_plugin(desmond_exec, "trajectory") return if _skip_replica_trajectory(msj): for t_val in [msj.trajectory.first, msj.trajectory.interval]: # use update in case t_val is of int type t_val.update(sea.Atom(float("inf"))) desmond_exec.plugin.trajectory.first.val = msj.trajectory.first.val desmond_exec.plugin.trajectory.interval.val = msj.trajectory.interval.val desmond_exec.plugin.trajectory.periodicfix.val = msj.trajectory.periodicfix.val desmond_exec.plugin.trajectory.write_velocity.val = msj.trajectory.write_velocity.val has_last_vel = getattr(msj.trajectory, 'write_last_vel', False) if has_last_vel: desmond_exec.plugin.trajectory.write_last_vel.val = msj.trajectory.write_last_vel.val try: desmond_exec.plugin.trajectory.frames_per_file.val = msj.trajectory.frames_per_file.val except AttributeError: pass try: center = msj.trajectory.center if (isinstance(center, sea.List)): desmond_exec.plugin.trajectory.center.val = [ model.gid(e) for e in center.val ] else: desmond_exec.plugin.trajectory.center.val = [ model.gid(e) for e in model.select_atom(center.val) ] except AttributeError: pass # Centering will mess up positional restraints, so disable if any are present if model is not None and restraint.has_positional_restraints(model): desmond_exec.plugin.trajectory.center.val = [] try: format = "dtr" format = msj.trajectory.format.val desmond_exec.plugin.trajectory.format.val = format.lower() except AttributeError: pass # Adjusts trajectory file name for its consistency with the format setting. traj_fname = msj.trajectory.name.val if format == "dtr": # DTR doesn't have an extension name. Conventionally, we name after the # pattern: <jobname>_trj. if ".xtc" == traj_fname[-4:]: traj_fname = traj_fname[:-4] + "_trj" elif format == "xtc": if "_trj" == traj_fname[-4:]: traj_fname = traj_fname[:-4] + ".xtc" elif ".xtc" != traj_fname[-4:]: traj_fname = traj_fname + ".xtc" if traj_fname != msj.trajectory.name.val: print("NOTE: Renamed trajectory file name to %s to be consistent\n" " with the specified format: %s." % (traj_fname, format)) desmond_exec.plugin.trajectory.name.val = traj_fname # Set up the trajectory name to store the last frame velocity if has_last_vel: desmond_exec.plugin.trajectory['last_vel_name'] = sea.Atom( f'{traj_fname[:-4]}_vel_trj') def __set_eneseq(msj, cfg, model): """ """ desmond_exec = get_exec(msj, cfg) if (isinstance(msj.eneseq, sea.Atom)): remove_plugin(desmond_exec, "eneseq") return desmond_exec.plugin.eneseq.name.val = msj.eneseq.name.val desmond_exec.plugin.eneseq.first.val = msj.eneseq.first.val desmond_exec.plugin.eneseq.interval.val = msj.eneseq.interval.val try: desmond_exec.plugin.eneseq["precision"] = msj.eneseq.precision.val except AttributeError: pass def __set_checkpt(msj, cfg, model): """ """ if (is_minimize(msj)): return desmond_exec = get_exec(msj, cfg) if (isinstance(msj.checkpt, sea.Atom)): if (msj.checkpt.val): __set_checkpt(get_default_setting(msj), cfg, model) else: desmond_exec.checkpt.first.val = "inf" desmond_exec.checkpt.interval.val = "inf" desmond_exec.checkpt.wall_interval.val = "inf" desmond_exec.checkpt.write_last_step.val = False else: desmond_exec.checkpt.name.val = msj.checkpt.name.val desmond_exec.checkpt.write_last_step.val = msj.checkpt.write_last_step.val if ("wall_interval" in msj.checkpt and desmond_exec.checkpt.wall_interval.val != float("inf")): desmond_exec.checkpt.first.val = "inf" desmond_exec.checkpt.interval.val = "inf" desmond_exec.checkpt.wall_interval.val = msj.checkpt.wall_interval.val else: desmond_exec.checkpt.first.val = msj.checkpt.first.val desmond_exec.checkpt.interval.val = msj.checkpt.interval.val desmond_exec.checkpt.wall_interval.val = "inf" def __set_maeff_output(msj, cfg, model): """ """ from schrodinger.application.desmond.packages import restraint desmond_exec = get_exec(msj, cfg) if isinstance(msj.maeff_output, sea.Atom) or (is_remd(msj) and _skip_replica_trajectory(msj)): remove_plugin(desmond_exec, "maeff_output") return desmond_exec.plugin.maeff_output.name.val = msj.maeff_output.name.val desmond_exec.plugin.maeff_output.first.val = msj.maeff_output.first.val desmond_exec.plugin.maeff_output.interval.val = msj.maeff_output.interval.val try: # If user runs desmond directly without multisim and if the center_atoms # is not set in the input .cfg file, we will get an AttributeError. desmond_exec.plugin.maeff_output[ "center_atoms"] = msj.maeff_output.center_atoms.val except AttributeError: desmond_exec.plugin.maeff_output[ "center_atoms"] = config.DEFAULT_SETTING.MD.maeff_output.center_atoms.val # Centering will mess up positional restraints, so disable if any are present if model is not None and restraint.has_positional_restraints(model): desmond_exec.plugin.maeff_output["center_atoms"] = sea.List() if is_minimize(msj): desmond_exec.plugin.maeff_output.trjdir.val = "" else: # read trjidx key to support front-end cfg files generated with # prior versions (<15-4) if 'trjidx' in msj.maeff_output: desmond_exec.plugin.maeff_output.trjdir.val = msj.maeff_output.trjidx.val.replace( '-out.idx', '_trj') else: desmond_exec.plugin.maeff_output.trjdir.val = msj.maeff_output.trjdir.val def __set_randomize_velocity(msj, cfg, model): """ """ if (is_minimize(msj)): return desmond_exec = get_exec(msj, cfg) if (isinstance(msj.randomize_velocity, sea.Atom)): if (msj.randomize_velocity.val): __set_randomize_velocity(get_default_setting(msj), cfg, model) else: remove_plugin(desmond_exec, "randomize_velocity") else: desmond_exec.plugin.randomize_velocities.first.val = msj.randomize_velocity.first.val desmond_exec.plugin.randomize_velocities.interval.val = msj.randomize_velocity.interval.val desmond_exec.plugin.remove_com_motion.first.val = msj.randomize_velocity.first.val desmond_exec.plugin.remove_com_motion.interval.val = msj.randomize_velocity.interval.val try: desmond_exec.plugin.randomize_velocities.seed.val = msj.randomize_velocity.seed.val except AttributeError: pass temp = msj.randomize_velocity.temperature.val if (isinstance(temp, list)): desmond_exec.plugin.randomize_velocities.temperature.val = temp[0][ 0] else: desmond_exec.plugin.randomize_velocities.temperature.val = temp com_idx = None if has_plugin(desmond_exec, 'remove_com_motion'): com_idx = desmond_exec.plugin.list.val.index('remove_com_motion') add_plugin(desmond_exec, 'randomize_velocities', com_idx) def __set_simbox(msj, cfg, model): """ """ return if (is_minimize(msj)): return desmond_exec = get_exec(msj, cfg) if (isinstance(msj.simbox, sea.Atom)): if (msj.simbox.val): __set_simbox(config.get_default_setting(msj), cfg, model) else: remove_plugin(desmond_exec, "simbox_output") else: desmond_exec.plugin.simbox_output.name.val = msj.simbox.name.val desmond_exec.plugin.simbox_output.first.val = msj.simbox.first.val desmond_exec.plugin.simbox_output.interval.val = msj.simbox.interval.val add_plugin(desmond_exec, "simbox_output")
[docs]def get_simbox_output_filename(msj): """ Returns 'None' if the simbox_output plugin is turned off. """ if (is_minimize(msj)): return None if (isinstance(msj.simbox, sea.Atom)): if (sea.simbox.val): return get_simbox_output_filename(get_default_setting(msj)) else: return msj.simbox.name.val
def __set_energy_group(msj, cfg, model): """ """ default_energy_group_setting = sea.Map(""" name = "$JOBNAME$[_replica$REPLICA$]_enegrp.dat" first = 0.0 interval = 1.2 self_energy = false corr_energy = true """) desmond_exec = get_exec(msj, cfg) if (isinstance(msj.energy_group, sea.Atom)): if (msj.energy_group.val): msj["energy_group"] = default_energy_group_setting __set_energy_group(msj, cfg, model) else: remove_plugin(desmond_exec, "energy_groups") else: custom_energy_group_setting = msj.energy_group msj.energy_group = default_energy_group_setting msj.energy_group.update(custom_energy_group_setting) desmond_exec.plugin.energy_groups.name.val = msj.energy_group.name.val desmond_exec.plugin.energy_groups.first.val = msj.energy_group.first.val desmond_exec.plugin.energy_groups.interval.val = msj.energy_group.interval.val eg = desmond_exec.plugin.energy_groups def set_options(term): if (term in msj.energy_group): if (msj.energy_group[term].val): eg.options.append(term) else: try: eg.options.remove(term) except ValueError: pass set_options("self_energy") set_options("corr_energy") # energy_groups slow down the simulation add_plugin(desmond_exec, "energy_groups") # if we use the energy groups plugin, we need to tell GCMC that it must # restore energy groups. we can do this whether or not GCMC is actually # enabled (ie added to plugin.list) if 'gcmc' in desmond_exec.plugin: desmond_exec.plugin.gcmc.restore_engrps.val = True def __set_pressure_tensor(msj, cfg, model): """ Set ptensor plugin and corresponding variables from MSJ pressure_tensor block. """ default_settings = sea.Map(""" name = "$JOBNAME$[_replica$REPLICA$].ptensor.gz" first = 0.0 interval = 1.2 print_volume = false """) desmond_exec = get_exec(msj, cfg) if (isinstance(msj.pressure_tensor, sea.Atom)): if (msj.pressure_tensor.val): msj["pressure_tensor"] = default_settings __set_pressure_tensor(msj, cfg, model) else: remove_plugin(desmond_exec, "ptensor") else: custom_settings = msj.pressure_tensor msj.pressure_tensor = default_settings msj.pressure_tensor.update(custom_settings) desmond_exec.plugin.ptensor.name.val = msj.pressure_tensor.name.val desmond_exec.plugin.ptensor.first.val = msj.pressure_tensor.first.val desmond_exec.plugin.ptensor.interval.val = msj.pressure_tensor.interval.val desmond_exec.plugin.ptensor.print_volume.val = msj.pressure_tensor.print_volume.val add_plugin(desmond_exec, "ptensor") def __set_dipole_moment(msj, cfg, model): """ Set dipole moment plugin and corresponding variables from MSJ block. """ default_settings = sea.Map(""" first = 0.0 interval = 1.2 name = "$JOBNAME$[_replica$REPLICA$]_dipole.dat" """) desmond_exec = get_exec(msj, cfg) if (isinstance(msj.dipole_moment, sea.Atom)): if (msj.dipole_moment.val): msj["dipole_moment"] = default_settings __set_dipole_moment(msj, cfg, model) else: remove_plugin(desmond_exec, "dipole_moment") else: custom_settings = msj.dipole_moment msj.dipole_moment = default_settings msj.dipole_moment.update(custom_settings) desmond_exec.plugin.dipole_moment.name.val = msj.dipole_moment.name.val desmond_exec.plugin.dipole_moment.first.val = msj.dipole_moment.first.val desmond_exec.plugin.dipole_moment.interval.val = msj.dipole_moment.interval.val add_plugin(desmond_exec, "dipole_moment")
[docs]def get_energy_group_output_filename(msj): """ Returns 'None' if the energy_groups plugin is turned off. """ if (isinstance(msj.energy_group, sea.Atom)): if (msj.energy_group.val): return get_energy_group_output_filename(get_default_setting(msj)) else: return msj.energy_group.name.val if ( msj.time.val > msj.energy_group.first.val) else None
[docs]def get_fep_output_filename(msj): """ """ if (not is_minimize(msj)): try: if (msj.time.val > msj.fep.output.first.val): return msj.fep.output.name.val except AttributeError: pass
# For minimization def __set_max_steps(msj, cfg, model): """ """ cfg.minimize.maxsteps.val = msj.max_steps.val def __set_convergence(msj, cfg, model): """ """ cfg.minimize.tol.val = msj.convergence.val def __set_steepest_descent_steps(msj, cfg, model): """ """ cfg.minimize.sdsteps.val = msj.steepest_descent_steps.val def __set_num_vector(msj, cfg, model): """ """ cfg.minimize.m.val = msj.num_vector.val def __set_backend(msj, cfg, model): """ """ if ("backend" in msj and isinstance(msj.backend, sea.Map)): cfg.update(msj.backend.dval) # For remd
[docs]def gen_temperature_for_solute_tempering(setting, model, base_temp, asl, printer=None): """ """ from . import remd if (model is None): print( "WARNING: Cannot predict REMD temperature ladder without model system." ) return [ 300, 400, 500, 600, 700, 800, ] if ("highest_temperature" in setting and "exchange_probability" in setting): top_temp = setting.highest_temperature.val temp, prob = remd.predict_with_temp_and_exch(( base_temp, top_temp, ), setting.exchange_probability.val, model, asl) if ("highest_temperature" in setting and "n_replica" in setting): top_temp = setting.highest_temperature.val temp, prob = remd.predict_with_temp_and_nreplica(( base_temp, top_temp, ), setting.n_replica.val, model, asl) if ("exchange_probability" in setting and "n_replica" in setting): temp, prob = remd.predict_with_nreplica_and_exch( setting.n_replica.val, setting.exchange_probability.val, base_temp, model, asl)[0] if (printer): printer("Temperature ladder (exchange probability):") for t, p in zip(temp, prob): printer(" %.3g (%.3g)" % ( t, p, )) printer(" %.3g (n/a)" % temp[-1]) return temp
[docs]def gen_replica(msj, model=None): """ """ replica = msj.replica generator = replica.generator.val if (generator in [ "solute_tempering", "parallel_tempering", ]): ref_temp = msj.temperature.val if (isinstance( msj.temperature, sea.Atom)) else msj.temperature[0][0].val ref_temp = float(ref_temp) asl = replica.atom.val if (generator == "solute_tempering") else "all" if (isinstance(replica.temperature, sea.Map)): rep_temp = gen_temperature_for_solute_tempering( replica.temperature, model, ref_temp, asl) else: rep_temp = replica.temperature.val replica = sea.List() if (ref_temp not in rep_temp): print( "WARNING: Reference temperature (%f) not found in replicas (%s)" % ( ref_temp, str(rep_temp), )) if (generator == "solute_tempering"): rep_fname = sea.Atom( "$JOBNAME_solute_tempering_replica%d-in.cms").val for i, e in enumerate(rep_temp): fname = rep_fname % i if (model): from . import remd remd.write_ff_solute_tempering(model, fname, asl, old_div(ref_temp, e)) replica.append(sea.Map("model_file = %s" % fname)) else: for e in rep_temp: replica.append(sea.Map("temperature = %.3g model_file = ?" % e)) else: raise ValueError("Unknown replica generator: %s" % replica.generator.val) return replica
def __set_replica(msj, cfg, model): """ """ msj = deepcopy(msj) replica_msj = [] if (is_remd(msj)): replica = msj.replica if (isinstance(replica, sea.Map)): replica = gen_replica(msj, model) if (not isinstance(replica, sea.List)): raise ValueError("Unknown type for 'replica' parameter: %s" % type(replica)) for e in replica: msj_ = deepcopy(msj) msj_.update(e) replica_msj.append(msj_) return ( msj, replica_msj, )
[docs]def get_replica_setting(msj): """ """ ref, replica = __set_replica(msj, None, None) return replica
def __set_model_file(msj, cfg, model): """ """ if ("model_file" in msj): desmond_exec = get_exec(msj, cfg) desmond_exec.plugin.maeff_output["bootfile"] = msj.model_file.val def __attach_msj(msj, cfg, model): """ """ cfg["ORIG_CFG"] = msj def __set_restrain(msj, cfg, model): # only concatenated stages need this pass def __set_restraints(msj, cfg, model): # only concatenated stages need this pass def __set_box(msj, cfg, model): pass
[docs]def concatenate_cfgs(simulate_cfgs, base_cfg): """ Create a single backend config file from a list of simulate configs created from the simulate block of a concatenate front-end config and the backend config created from the rest of the concatenate front-end config. :param simulate_cfgs: The list of backend config files created from the simulate block of a concatenate front-end config :type simulate_cfgs: `list` of `sea.Map` :param base_cfg: The backend config file created from a concatenate front-end config, with its simulate block removed. :type base_cfg: `sea.Map` :return: A copy of the base concatenate config with the information from the simulate configs merged in. :rtype: `sea.Map` """ base_cfg = deepcopy(base_cfg) integrator = base_cfg.integrator concat = sea.Map() concat.sequence = sea.List() first_restrain = None posre_scaling = None for i, simulate_cfg in enumerate(simulate_cfgs): # we get a copy of the integrator that was modified and add it to the # concatenator simulate_integrator = simulate_cfg.integrator this_integrator_type = simulate_integrator.type.val this_integrator = simulate_integrator[this_integrator_type] this_integrator_name = 'Stage_{}_'.format(i) + this_integrator_type # copy default values from config's integrator level to each # integrator-specific level for cfg_attr in [ 'dt', 'respa', 'temperature', 'pressure', 'thermostat', 'barostat', 'posre_scaling' ]: if cfg_attr in simulate_integrator: this_integrator[cfg_attr] = simulate_integrator[cfg_attr] restraints = simulate_cfg.restraints # not a valid part of backend config, just temporary attribute del simulate_cfg.restraints first_restrain, posre_scaling = set_relative_restrain_scaling( first_restrain, posre_scaling, restraints, this_integrator) # This updates values in base_cfg to be equal to the value of the last # simulate cfg on which it is set. # TODO This allows silent conflicts to be overwritten unseen. We should # replace this with consistency checks, or refactor and use the # consistency checks from cmj.get_concat_stages here # Note: This will overwrite mdsim.last_time, which will # be restored below. for key in list(simulate_cfg): if key not in ['integrator', 'ORIG_CFG', 'boot']: base_cfg[key].update(simulate_cfg[key]) concat[this_integrator_name] = this_integrator desmond_exec = get_exec(simulate_cfg.ORIG_CFG, simulate_cfg) this_time = desmond_exec.last_time.val # The concatenate stage runs cyclically. # If the total concatenate stage simulation time is # the sum of the sub-stage times, the last step will be # one step run for the first sub-stage. This can cause # stability problems (DESMOND-8606). Modify the last # stage so it runs a little longer, preventing this issue. if i == len(simulate_cfgs) - 1: # Must be at least 1 timestep, larger is fine. # Only 1 timestep of this is run, the rest is cut-off # by the last_time. this_time += 1.0 sequence_info = sea.Map( dict(name=this_integrator_name, time=this_time, type=this_integrator_type)) concat.sequence.append(sequence_info) integrator.Concatenator = concat integrator.type.val = 'Concatenator' # Restore the total time, which was overwriten in the update above. __set_time(base_cfg.ORIG_CFG, base_cfg, None) return base_cfg
[docs]def set_relative_restrain_scaling(first_restrain, previous_posre_scaling, restrain, this_integrator): """ Set each integrator's posre_scaling attribute to the ratio between the desired restraint force constant and the first force constant. This allows us to modulate the restraint force constant between concatenate stages. :param first_restrain: the restrain parameter from the first stage's config, or None if updating the first stage :type first_restrain: `sea.Map` :param previous_posre_scaling: posre_scaling parameter from the previous stage's config, or None if unknown :type previous_posre_scaling: float or NoneType :param restrain: this stage's restrain parameter :type restrain: `sea.Map` :param this_integrator: this stage's integrator :type this_integrator: `sea.Map` :return: the first restrain parameter and posre scaling factor """ if first_restrain is None: assert previous_posre_scaling is None if restrain.existing.val == 'retain': raise ValueError('We cannot retain non-existing restraints') first_restrain = restrain first_fc = restrain_force_constant(first_restrain) if 'posre_scaling' in this_integrator: # should never be set prior to here to anything but one assert this_integrator.posre_scaling.val == 1. if restrain != first_restrain: if not first_fc: raise ValueError('We cannot change the restraints unless the first' 'restraint has a non-zero force constant') if restrain.existing.val == 'retain': assert previous_posre_scaling is not None posre_scaling = previous_posre_scaling else: this_fc = restrain_force_constant(restrain) if this_fc: msg = cmj.check_restrain_diffs(restrain.new, first_restrain.new) if msg: raise ValueError(msg) posre_scaling = this_fc / first_fc else: posre_scaling = 1. this_integrator['posre_scaling'] = posre_scaling return first_restrain, posre_scaling
[docs]def restrain_force_constant(restraints: sea.Map) -> float: """ Get restraints force constant. :param restraints: the restraints block for a given stage :return: the (first) restraint force constant """ if restraints.new: # we can take the first here because we only allow lists when # all restrain blocks are equal or none r = restraints.new[0] if 'fc' in r: return r.fc.val elif 'force_constant' in r: return r.force_constant.val else: return r.force_constants.val else: return 0
def __optimize_cfg(msj, cfg, box: List[float] = None): """ Update the cfg settings based on the simulation box. If box is None, do nothing. """ if box is not None: optimize_key(msj, cfg, box) # FIXME: replace optimize_key with forceconfig. def __clean_cfg(cfg): """ """ app_map = { "mdsim": [ "PLUGIN_COMMON", "CHECKPT_COMMON", "minimize", "remd", "vrun", "reinit", ], "vrun": [ "PLUGIN_COMMON", "CHECKPT_COMMON", "minimize", "remd", "mdsim", "reinit", ], "reinit": [ "PLUGIN_COMMON", "CHECKPT_COMMON", "minimize", "remd", "mdsim", "vrun", ], "remd": [ "PLUGIN_COMMON", "CHECKPT_COMMON", "minimize", "vrun", "mdsim", "reinit", ], "minimize": [ "PLUGIN_COMMON", "CHECKPT_COMMON", "remd", "vrun", "mdsim", "reinit", ], } for e in app_map[cfg.app.val]: if (e in cfg): del cfg[e] # Deletes unused plugin settings. desmond_exec = cfg[cfg.app.val] used_plugin = desmond_exec.plugin.list.val used_plugin += [ "list", ] # for e in desmond_exec.plugin : # if (e not in used_plugin) : # del desmond_exec.plugin[e] def __fix_cfg(msj, cfg): """ """ # Special treatment for annealing if ("annealing" in msj and msj.annealing.val): ens_type = cfg.integrator.type.val ens = cfg.integrator[ens_type] del cfg.integrator[ens_type] cfg.integrator.type.val = "anneal" cfg.integrator["anneal"] = sea.Map("type = " + ens_type) cfg.integrator.anneal[ens_type] = ens # Special treatment for force.nonbonded.far if (cfg.force.nonbonded.far.type.val == "none"): cfg.force.nonbonded["far"] = "none" # Adjusts the remd.interval and remd.deltaE.interval value to be dividible by migration.interval. DESMOND-3600 def justify_interval(val, againstee, description=""): try: old_val = val.val val.val = (int(old_div(val.val, againstee) - 1E-7) + 1) * againstee # Prints out warning. DESMOND-5196 print( "NOTE: Adjusted %s to be dividible by migration.interval: %g => %g" % ( description, old_val, val.val, )) except OverflowError: pass try: mi_interval_list = [] for replica in cfg.remd.graph.values(): try: mi_interval_list.append(replica.migration.interval.val) except AttributeError: mi_interval_list.append(cfg.migration.interval.val) mi_interval = min(mi_interval_list) for replica in cfg.remd.graph.values(): try: replica.migration.interval.val = mi_interval except AttributeError: # This exception happens when the replica uses the default migration.interval setting and doesn't have its # own migration.interval data field. pass cfg.migration.interval.val = mi_interval justify_interval(cfg.remd.first, mi_interval, "cfg.remd.first") justify_interval(cfg.remd.interval, mi_interval, "cfg.remd.interval") justify_interval(cfg.remd.deltaE.first, mi_interval, "cfg.remd.deltaE.first") justify_interval(cfg.remd.deltaE.interval, mi_interval, "cfg.remd.deltaE.interval") except AttributeError: pass def __msj2cfg_helper(msj, cfg, model, macro): """ """ sea.set_macro_dict(macro) cfg = sea.Map(DEFAULT_CONFIG) if (cfg is None) else cfg cfg = cfg.dval this_mod = globals() for k in list(msj): if (k not in [ "replica", "backend", ]): this_mod["__set_" + k](msj, cfg, model) return cfg
[docs]def msj2cfg(msj, cfg, model, macro=None): """ """ if 'simulate' in msj: # assumed to be a concatenate config simulate_cfgs = [] for msj_ in msj.simulate: simulate_cfg = msj2cfg(msj_, deepcopy(cfg), model, macro) simulate_cfg.restraints = cmj.get_restraints_xor_convert_restrain( msj_) simulate_cfgs.append(simulate_cfg) base_concat_msj = deepcopy(msj) del base_concat_msj['simulate'] base_concat_cfg = msj2cfg(base_concat_msj, deepcopy(cfg), model, macro) return concatenate_cfgs(simulate_cfgs, base_concat_cfg) macro = macro or {} msj_ref, msj_replica = __set_replica(msj, cfg, model) cfg_ref = __msj2cfg_helper(msj_ref, deepcopy(cfg), model, macro) cfg_replica = [] for i, e in enumerate(msj_replica): macro["$REPLICA"] = str(i) cfg_replica.append(__msj2cfg_helper(e, deepcopy(cfg), model, macro)) cfg_ref.app.val = get_exec_name(msj) __optimize_cfg(msj_ref, cfg_ref, model and model.box) __set_backend(msj_ref, cfg_ref, model) cfg_ref = cfg_ref.bval cfg_rep = [] for m, c in zip(msj_replica, cfg_replica): c.app.val = get_exec_name(msj) if m.model_file.val: if m.has_key('box') and m.box.val is not None: # noqa: W601 # Use cached box info if available box = m.box.val else: # Use StructureReader as it is faster than cms.Cms rep_ct = StructureReader.read(m.model_file.val) box = cms.get_box(rep_ct) __optimize_cfg(m, c, box) else: __optimize_cfg(m, c, model and model.box) __set_backend(m, c, model) cfg_rep.append(c.bval) cfg_diff = [] for e in cfg_rep: changed, referred, added, lost = sea.diff(e, cfg_ref) change = changed.update(added) if ("mdsim" in change): del change["mdsim"] if ("minimize" in change): del change["minimize"] if ("vrun" in change): del change["vrun"] if ("reinit" in change): del change["reinit"] cfg_diff.append(change) for i, e in enumerate(cfg_diff): if cfg_ref.remd.graph.val.get("T%02d" % i): cfg_ref.remd.graph["T%02d" % i].update(e) else: cfg_ref.remd.graph["T%02d" % i] = e cfg_ref.remd.graph.edges.nodes.append("T%02d" % i) __clean_cfg(cfg_ref) __fix_cfg(msj, cfg_ref) __attach_msj(msj, cfg_ref, model) return cfg_ref
if ("__main__" == __name__): import sys msj = sea.Map(open(sys.argv[1], "r").read()) model = cms.Cms(file=sys.argv[2]) cfg = msj2cfg(msj, None, model, { "$JOBNAME": "test", }) print(cfg) #optimize_key( cfg, model )