Source code for schrodinger.application.desmond.stage.prepare.forcefield

import base64
import copy
import glob
import json
import math
import os
import shutil
import string
import warnings
from collections import defaultdict
from itertools import chain
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from pathlib import Path

import numpy as np

from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import config
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import fep_schedule
from schrodinger.application.desmond import fepana
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger.application.desmond.constants import FEP_ENCODED_RESTRAINTS
from schrodinger.application.desmond.constants import FEP_RESTRAIN
from schrodinger.application.desmond.constants import FEP_RESTRAINT_CORRECTION
from schrodinger.application.desmond.stage import simulate
from schrodinger.application.desmond.stage.jobs import DesmondJob
from schrodinger.application.desmond.stage.utils import SystemBuilder
from schrodinger.application.desmond.stage.utils import check_cms_file
from schrodinger.application.desmond.stage.workflow import \
    PREDEFINED_STAGE_FAMILY
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.structutils.analyze import evaluate_asl
from schrodinger.adapter import evaluate_smarts
from schrodinger.utils import fileutils
from schrodinger.utils import sea
from schrodinger.utils import subprocess

Restraints = List[sea.Map]

__all__ = [
    'AssignCustomCharge', 'AssignForcefield', 'AssignLambdaSchedule',
    'LoadRestraintsFromStructure', 'ForcefieldBuilderLauncher'
]


[docs]def is_valid_opls_name(name: str) -> bool: try: mm.opls_name_to_version(name) except IndexError: return False return True
[docs]class AssignCustomCharge(cmj.StructureStageBase): """ This sets up and runs the custom charge script for the ligand cts in the input file. Ligands must be in separate cts, otherwise custom charges will not be assigned. Other cts will be passed unchanged. This must be called prior to the AssignForcefield stage. """ NAME = "assign_custom_charge" PARAM = cmj._create_param_when_needed(""" DATA = { enabled = false mode = %s } VALIDATE = { enabled = {type = bool} mode = {type = enum range = [%s]} } """ % (constants.CUSTOM_CHARGE_MODE.KEEP, " ".join( constants.CUSTOM_CHARGE_MODE)))
[docs] def run(self, jobname: str, mae_fname: str) -> Optional[str]: mode = self.param.mode.val if self.param.enabled.has_tag("setbyuser"): if self.param.enabled.val: mode = constants.CUSTOM_CHARGE_MODE.ASSIGN msg = f'The "enabled = true" option is deprecated. Use "mode = {mode}" instead.' else: mode = constants.CUSTOM_CHARGE_MODE.KEEP msg = f'The "enabled = false" option is deprecated. Use "mode = {mode}" instead.' warnings.warn(msg, DeprecationWarning, stacklevel=2) out_fname = f'{jobname}-out.mae' cts = list(structure.StructureReader(mae_fname)) ligands = [] if mode in [ constants.CUSTOM_CHARGE_MODE.CLEAR, constants.CUSTOM_CHARGE_MODE.ASSIGN ]: # This import is not available in fep scholar. from schrodinger.application.scisol.packages.fep import fepmae _, _, _, ligands = fepmae.filter_receptors_and_ligands(cts) custom_cts = [] for i, ct in enumerate(cts): if mode in [ constants.CUSTOM_CHARGE_MODE.CLEAR, constants.CUSTOM_CHARGE_MODE.ASSIGN ]: # Clear existing custom charges mm.mmffld_removeCustomChargesFromCT(ct.handle) if ct not in ligands: custom_cts.append(ct) continue # For keep and clear, do not generate custom charges if mode in [ constants.CUSTOM_CHARGE_MODE.KEEP, constants.CUSTOM_CHARGE_MODE.CLEAR ]: custom_cts.append(ct) continue # If there is more than one molecule present, skip custom charge assignment if len(ct.molecule) > 1: self._print( "quiet", f"WARNING: More than one molecule present in {ct.title}, " "skipping custom charge assignment.") custom_cts.append(ct) continue # Input filename must be unique per ct. charge_in_fname = "{}_{}-in.mae".format(jobname, i) charge_out_fname = "{}-out.mae".format(jobname) ct.write(charge_in_fname) cmd = [ os.path.join(envir.CONST.SCHRODINGER, "generate_custom_charges"), "-imae", charge_in_fname, "-omae", charge_out_fname, ] # Reference coordinates may not have been set if constants.REFERENCE_COORD_PROPERTIES[0] in ct.atom[1].property: cmd.extend([ '-coord_property_basename', constants.REFERENCE_COORD_BASENAME ]) job = jobcontrol.launch_job(cmd) job.wait() if not job.succeeded() or not os.path.exists(charge_out_fname): self._print("quiet", "ERROR: generate_custom_charges failed:") for log_fname in glob.glob('*.log'): self._print("quiet", f"Log file : {log_fname}") with open(log_fname, 'r') as f: self._print( "quiet", f"Log file content:\n>{'>'.join(f.readlines())}") self._print("quiet", "(end of log file)\n") return None # Add output to list custom_ct = structure.Structure.read(charge_out_fname) custom_cts.append(custom_ct) # Clean up temporary files fileutils.force_rmtree(charge_in_fname.strip('.mae'), ignore_errors=True) fileutils.force_remove(charge_in_fname) fileutils.force_remove(charge_out_fname) # Write output cts, potentially with custom charge. # May be missing if a generate_custom_charges job failed. if len(custom_cts) == len(cts): with structure.StructureWriter(out_fname) as w: for ct in custom_cts: w.append(ct) return out_fname
[docs]class AssignForcefield(cmj.StageBase): """ """ NAME = "assign_forcefield" FFLD_WATER = constants.FFLD_WATER parameter_string = string.Template(""" DATA = { forcefield = OPLS_2005 water = SPC humble = no fepio_mode = 2 restrain = none restraints = { new = [] existing = $restraint_ignore } print_restraint = false atom_group = retain fep_retain_angle = yes core_hopping_fepio = on hydrogen_mass_repartition = off make_alchemical_water = on fep_enhance_sampling_dihedral = off assign_is_infinite = off fail_on_lewis_failure = on use_zob_property = on pose_conf_restraint = { enable = false name = harm schedule = $pose_conf_schedule fc = 50.0 sigma = 10.0 alpha = 1.0 dihedral_schedule = "" } macromolecule_conf_restraint = { enable = false dihedral_schedule = "" $backbone = { name = harm schedule = "" fc = 50.0 sigma = 10.0 alpha = 1.0 } $sidechain = { name = harm schedule = "" fc = 50.0 sigma = 10.0 alpha = 1.0 } $calpha_rung = { name = harm schedule = "" fc = 5.0 alpha = 1.0 } } } VALIDATE = { forcefield = {type = str _check = check_forcefield} water = {type = enum range = [$ffld_water none]} humble = {type = bool} fepio_mode = {type = int range = [1 3]} atom_group = [ {type = enum range = [retain none]} {atom = {type = str} name = {type = str} index = {type = int range = [0 7]} } {type = list size = 0 elem = {atom = {type = str} name = {type = str} index = {type = int range = [0 7]} } } ] restrain = [ {type = enum range = [retain none]} {_mapcheck = check_restrain _skip = all} {type = list size = -1 elem = {_mapcheck = check_restrain _skip = all} } ] restraints = { existing = {type = enum range = [$existing_restraint]} new = [{type = list} {_skip = all}] } print_restraint = {type = bool } fep_retain_angle = {type = bool} core_hopping_fepio = {type = bool} hydrogen_mass_repartition = {type = bool} make_alchemical_water = {type = bool} fep_enhance_sampling_dihedral = {type = bool} assign_is_infinite = {type = bool} fail_on_lewis_failure = {type = bool} use_zob_property = {type = bool} pose_conf_restraint = { enable = {type = bool} name = {type = enum range = [harm fbhw soft]} fc = {type = float+} sigma = {type = float+} alpha = {type = float+} schedule = {type = str} dihedral_schedule = {type = str} _mapcheck = check_pose_conf_restraint } macromolecule_conf_restraint = { enable = {type = bool} dihedral_schedule = {type = str} $backbone = { name = {type = enum range = [harm fbhw soft]} schedule = {type = str} fc = {type = float+} sigma = {type = float+} alpha = {type = float+} } $sidechain = { name = {type = enum range = [harm fbhw soft]} schedule = {type = str} fc = {type = float+} sigma = {type = float+} alpha = {type = float+} } $calpha_rung = { name = {type = enum range = [harm]} schedule = {type = str} fc = {type = float+} alpha = {type = float+} } } } """).substitute(ffld_water=" ".join(FFLD_WATER.keys()), restraint_ignore=constants.EXISTING_RESTRAINT.IGNORE, existing_restraint=" ".join(constants.EXISTING_RESTRAINT), pose_conf_schedule=constants.POSE_DIHEDRAL_RESTRAINT, backbone=constants.ConfRestraintType.BACKBONE, sidechain=constants.ConfRestraintType.SIDECHAIN, calpha_rung=constants.ConfRestraintType.CALPHA_RUNG) PARAM = cmj._create_param_when_needed(parameter_string)
[docs] def crunch(self): """ """ self._print("debug", "In AssignForcefield.crunch") from schrodinger.application.desmond.packages import viparr for pj in self.get_prejobs(): jobname, dir = self._get_jobname_and_dir(pj) fname_in = jobname + "-in.mae" fname_out = jobname + "-out.cms" if (not os.path.isdir(dir)): os.makedirs(dir) util.chdir(dir) util.symlink(pj.output.struct_file(), fname_in) # Constructs the command line for job-launching. if is_valid_opls_name(self.param.forcefield.val): def run(job): if mm.opls_name_to_version(self.param.forcefield.val) < 16: if self.param.water.val in constants.S_OPLS_WATER_MODELS: self._print( "quiet", f"ERROR: {self.param.water.val} water model uses virtual sites and is not compatible with OPLS_2005." ) job.status.set(cmj.JobStatus.BACKEND_ERROR) return util.chdir(job.dir) fname_in = job.jobname + "-in.mae" fname_out = job.jobname + "-out.cms" tmp_fname = f'{job.jobname}-out.mae' all_cts = list( structure.StructureReader(pj.output.struct_file())) for ct in all_cts: cms.Cms.clean_ct_properties(ct) try: water_ct_index = self.get_water_ct_indices(all_cts) except ValueError: job.status.set(cmj.JobStatus.BACKEND_ERROR) return if (water_ct_index != []): water = self.param.water.val if (water.lower() != "none"): for idx in water_ct_index: for a in all_cts[idx].atom: a.pdbres = AssignForcefield.FFLD_WATER[ water.upper()] print(f'Writing {len(all_cts)} structures to {tmp_fname}') with open(tmp_fname, 'w') as fp: # DESMOND-11261 -- write & flush & fsync fp.write('{\n s_m_m2io_version\n :::\n 2.0.0\n}\n\n') fp.write(struc.write_structures(all_cts)) fp.flush() os.fsync(fp.fileno()) with open(tmp_fname, 'r') as fp: print(f'wrote : {len(fp.read())} bytes') # ZOB property is only supported for FFLD 16 use_zob_property = self.param.use_zob_property.val if mm.opls_name_to_version(self.param.forcefield.val) < 16: use_zob_property = False cmd = ["run", "desmond_ff_builder", "--opls_version", self.param.forcefield.val, "--fep_mode", str(self.param.fepio_mode.val), "--fep_retain_angle", str((2 if (self.param.fep_retain_angle.val) else 1)), ] \ + (["--keep_ffio_block", ] if (self.param.humble.val) else []) \ + (["--skip_fepio"] if (self.param.core_hopping_fepio.val) else []) \ + (["--use_zob_prop"] if (use_zob_property) else []) \ + (["--fail_on_lewis_failure"] if (self.param.fail_on_lewis_failure.val) else []) \ + [tmp_fname, fname_out, ] proc_ret = subprocess.call(cmd) if (proc_ret or not os.path.isfile(fname_out)): fileutils.force_remove(fname_out) job.status.set(cmj.JobStatus.BACKEND_ERROR) return model = cms.Cms(file=fname_out) model.sanitize_for_viparr() model.write(fname_out) job.status.set(cmj.JobStatus.SUCCESS) jlaunch_cmd = run else: ffmap = { "CHARMM": [ "-f", "charmm36_lipids", "-f", "charmm36_protein", ], "AMBER": [ "-f", "amber03", ], } try: ff = ffmap[self.param.forcefield.val] except KeyError: ff = [ "-f", self.param.forcefield.val, ] water = self.param.water.val.lower() def run(job, humble=self.param.humble.val, ff=ff, water=water): fname_in = job.jobname + "-in.mae" fname_out = job.jobname + "-out.cms" c_opt = [] if humble: cts = [e for e in cms.ffiostructure.CMSReader(fname_in)] for i, e in enumerate(cts, 1): if (not e.hasFfio()): c_opt.extend([ "-c", str(i), ]) args = c_opt + ff if water != "none": args.extend(["-f", water]) args.extend([fname_in, fname_out + "_vp"]) self._print("debug", "viparr args: " + str(args)) viparr.main(args) if not os.path.isfile(fname_out + "_vp"): job.status.set(cmj.JobStatus.BACKEND_ERROR) return cmd = [ "run", "-FROM", "desmond", "build_constraints.py", fname_out + "_vp", fname_out + "_vp_bc", ] proc_ret = subprocess.call(cmd) if (proc_ret or not os.path.isfile(fname_out + "_vp_bc")): job.status.set(cmj.JobStatus.BACKEND_ERROR) return cmd = [ "run", "desmond_ff_builder", "--fep_mode", str(self.param.fepio_mode.val), "--fep_retain_angle", str((2 if (self.param.fep_retain_angle.val) else 1)), "--keep_ffio_block", fname_out + "_vp_bc", fname_out, ] proc_ret = subprocess.call(cmd) if (proc_ret or not os.path.isfile(fname_out)): job.status.set(cmj.JobStatus.BACKEND_ERROR) return job.status.set(cmj.JobStatus.SUCCESS) jlaunch_cmd = run new_job = DesmondJob(pj.task, pj.gid, pj.permanent_restrain, pj.permanent_group, jobname, pj, self, jlaunch_cmd, dir) new_job.need_host = False new_job.output.add(os.path.abspath(fname_out), checker=check_cms_file) self.add_job(new_job) self._print("debug", "Out AssignForcefield.crunch")
def _postbuild(self, job): """ """ fname = job.output.struct_file() model = cms.Cms(file=fname) model = SystemBuilder.set_atom_group(model, self.param.atom_group) setting = self.param.restrain if (isinstance(setting, sea.Atom) and setting.val == "retain"): model = SystemBuilder.set_restrain(model, sea.Atom("none"), job.permanent_restrain, job) else: model = SystemBuilder.set_restrain(model, setting, None, job) # when empty set permanent_restrain to None job.permanent_restrain = model.get_restrain() or None job.permanent_group = model.get_atom_group() if (job.permanent_group == []): job.permanent_group = None self._update_maestro_group_and_title(model) if self.param.assign_is_infinite.val: from schrodinger.application.matsci import desmondutils model.remove_cts_property(constants.IS_INFINITE) is_infinite = desmondutils.is_infinite(model) model.set_cts_property(constants.IS_INFINITE, is_infinite) self._print('debug', 'Set %s to %s' % (constants.IS_INFINITE, is_infinite)) titration_ct = model.get_titration_ct() if titration_ct is not None: from schrodinger.application.scisol.packages.lambda_dynamics.interaction_terms import \ generate_titration_states try: generate_titration_states(titration_ct, self.param.forcefield.val) titration_ct.property[constants.DESMOND_WRITE_JSON] = True except (ChildProcessError, FileNotFoundError) as err: self._print('quiet', err) job.status.set(cmj.JobStatus.BACKEND_ERROR) return model.write(fname) (ref_ct, mut_ct) = model.get_fep_cts() if self.param.core_hopping_fepio.val and mut_ct and ref_ct: out_fname = fname + "_" ffld_name = self.param.forcefield.val if not is_valid_opls_name(ffld_name): raise RuntimeError( "ERROR: Core-hopping FEP only works with OPLS force field.") if model.use_titration: import schrodinger.application.scisol.packages.lambda_dynamics.interaction_terms as interaction_terms from schrodinger.application.desmond.packages.restraint import \ b64_encode from schrodinger.application.desmond.packages.restraint import \ get_encoded_restraints from schrodinger.application.desmond.packages.restraint import \ set_encoded_restraints # offset comp_ct index by 1 (starting number for ct) + 1 (first cms ct is always fullsystem) ct_numbers = tuple( model.comp_ct.index(ct) + 2 for ct in (ref_ct, mut_ct)) alchemical_changes = interaction_terms.combine_pairwise_int_match( (ref_ct, mut_ct), opls_version=mm.opls_name_to_version(ffld_name), restraint_fc=10, conf_restraint=self.param.macromolecule_conf_restraint.val, dihedral_sampling=self.param.fep_enhance_sampling_dihedral. val, pose_restraint=self.param.pose_conf_restraint.val, ct_numbers=ct_numbers) pose_conf_restraints = alchemical_changes.pop( constants.CONF_POST_RESTRAINTS, None) ref_ct.property[constants.FEP_ALCHEMICAL_CHANGE] = b64_encode( json.dumps(alchemical_changes)) if pose_conf_restraints is not None: # This should be the first time the restraint property is set in model. if get_encoded_restraints(model): self._print( "quiet", "Warning: existing restraints found, these will be ignored and overwritten." ) set_encoded_restraints( model, b64_encode(pose_conf_restraints.toJson())) ref_ct.property[constants.DESMOND_WRITE_JSON] = True # FEP-stage number used in msys for alchemical changes # Ideally they should be imported from desmond ref_ct.property[constants.FEPIO_STAGE] = 1 mut_ct.property[constants.FEPIO_STAGE] = 2 model.write(out_fname) shutil.move(out_fname, fname) else: import schrodinger.application.scisol.packages.core_hopping.int_fepio as fepio fepio.write_fepio_from_int_map( fname, out_fname, mm.opls_name_to_version(ffld_name), conf_restraint=self.param.macromolecule_conf_restraint.val, dihedral_sampling=self.param.fep_enhance_sampling_dihedral. val, pose_restraint=self.param.pose_conf_restraint.val) shutil.move(out_fname, fname) if cmj.has_explicit_restraints(self.param): if job.permanent_restrain: raise RuntimeError( "ERROR: old-style (permanent) restrain and 'restraints' " "cannot be used simultaneously") if "restrain" in self.param and self.param.restrain.val != "none": raise RuntimeError( "ERROR: 'restrain' and 'restraints' are mutually exclusive") from schrodinger.application.desmond.packages.restraint import \ RestraintBuilder model = cms.Cms(file=fname) builder = RestraintBuilder( restraint_terms=self.param.restraints.new, existing=self.param.restraints.existing.val, cms_sys=model, persistent=True) builder.addRestraints() out_fname = fname + "_restraints" model.write(out_fname) shutil.move(out_fname, fname) if self.param.print_restraint.val: self._print("quiet", builder.getString(compact=True)) if self.param.hydrogen_mass_repartition.val: model = cms.Cms(file=fname) out_cms_filename = fname + "__" for ct in model.comp_ct: ct.ffio.property[ 'b_ffio_hmr'] = self.param.hydrogen_mass_repartition.val model.write(out_cms_filename) shutil.move(out_cms_filename, fname) if self.param.make_alchemical_water.val and mut_ct and ref_ct: fname_alchem_water = fname + "_prealchemical_water.cms" shutil.copyfile(fname, fname_alchem_water) self._make_alchemical_water(fname_alchem_water) shutil.move(fname_alchem_water, fname)
[docs] def hook_captured_successful_job(self, job): """ """ try: self._postbuild(job) except RuntimeError as e: self._print("quiet", e) return "dissolved"
@staticmethod def _update_maestro_group_and_title(model: cms.Cms): component = struc.component_structures(model) subgroup = (component.fep_ref and "FEP: %s" % cmj.ENGINE.jobname or "MD: %s" % cmj.ENGINE.jobname) # yapf: disable struc_iter = struc.struc_iter(component.solute, component.membrane, component.cosolvent) solvent_iter = struc.struc_iter(component.solvent) model.fsys_ct.title = " + ".join(e.title for e in struc_iter) or \ " + ".join(e.title for e in solvent_iter) or \ 'Untitled' for ct in ([model.fsys_ct] + model.comp_ct): ct.property[constants.M_SUBGROUP_TITLE] = ct.property[ constants.M_SUBGROUPID] = subgroup ct.property[constants.M_SUBGROUP_COLLAPSED] = 0 model.synchronize_fsys_ct() @staticmethod def _make_alchemical_water(cms_fname: str): """ Reads a CMS file, and moves one or more water molecules out of the "solvent" component structure(s) into a new component structure whose CT type is `constants.CT_TYPE.VAL.ALCHEMICAL_WATER`. We call this component structure "alchemical water". The number of the moved water molecules is determined by the difference in formal charge between the reference and mutant component structures such that once the alchemical water molecules are mutated to ions the mutant structure would have the same formal charge as the reference. If there is no difference, then there is no need for alchemical water, and this function will have no effects. The moved water molecules are selected such that the oxygen atoms are at least 10 Angstroms away from any atoms in the "solute" component structure. If no such water molecules can be found, this function will try to find water molecules closer to the "solute" atoms, but not closer than 2 angstroms. If still no water molecules can be found, a `ValueError` exception will be raised. The new "alchemical water" component structure is put right after the reference and the mutant component structures. A new `Cms` object will be constructed and is written out, overwriting the original CMS file `cms_fname`. """ cms_model = cms.Cms(file=cms_fname) charge_diff = (cms_model.ref_ct.formal_charge - cms_model.mut_ct.formal_charge) if 0 == charge_diff: return num_water_wanted = abs(charge_diff) bulk_water_oxygen_asl = f"(water and (atom.ele O)) and (beyond %d " \ f"(atom.{constants.CT_TYPE} '{constants.CT_TYPE.VAL.SOLUTE}'))" min_distance_away = 10 water_oxygen_aids = [] while num_water_wanted > len(water_oxygen_aids): if min_distance_away <= 1: raise ValueError("Not enough water molecules to pick for " "alchemical water.") water_oxygen_aids = cms_model.select_atom( bulk_water_oxygen_asl % min_distance_away)[:num_water_wanted] min_distance_away -= 1 # Converts `water_oxygen_aids` to atom indices in the component # structure. atom_index_fix = {} index_delta = 0 for i, comp_ct in enumerate(cms_model.comp_ct): atom_index_fix[i] = index_delta index_delta += comp_ct.atom_total water_oxygen_indices = defaultdict(list) # Key = comp-ct-index, value = atom-index-in-comp-ct for aid in water_oxygen_aids: atom = cms_model.fsys_ct.atom[aid] ct_index = atom.property[ constants.CT_INDEX] - 1 # `CT_INDEX` property is 1-based. water_oxygen_indices[ct_index].append(aid - atom_index_fix[ct_index]) # We will extract the selected water molecules out of their respective # component structures and put them into a new component water # structure. # First, we prepare an empty new structure with the proper ffio block # for water molecules. We achieve that by copying an existing water # structure and deleting all atoms in it. alchem_water_structure = cms_model.comp_ct[ct_index].copy() alchem_water_structure.title = "Alchemical Water" alchem_water_structure.deleteAtoms( range(1, alchem_water_structure.atom_total + 1)) # Then we move the selected water molecules into the new component # structure. alchem_water_structure.ffio.deletePseudos( list(range(1, len(alchem_water_structure.ffio.pseudo) + 1))) alc_pseudo_idx = 1 for ct_index, oxygen_indices in water_oxygen_indices.items(): comp_ct = cms_model.comp_ct[ct_index] atom_indices = evaluate_asl( comp_ct, "fillmol (a.n %s)" % " ".join(map(str, oxygen_indices))) alchem_water_structure.extend( comp_ct.extract(atom_indices, copy_props=True)) comp_ct.deleteAtoms(atom_indices) n_virtuals = len(comp_ct.ffio.virtual) if n_virtuals: del_pseudo = [] for oxygen_idx in oxygen_indices: water_number = (oxygen_idx - 1) // 3 del_pseudo.extend([ water_number * n_virtuals + vir_idx for vir_idx in range(1, n_virtuals + 1) ]) alchem_water_structure.ffio.addPseudos(len(del_pseudo)) for pseudo_idx in del_pseudo: src_pseudo = comp_ct.ffio.pseudo[pseudo_idx] alc_pseudo = alchem_water_structure.ffio.pseudo[ alc_pseudo_idx] (alc_pseudo.x_coord, alc_pseudo.y_coord, alc_pseudo.z_coord) = (src_pseudo.x_coord, src_pseudo.y_coord, src_pseudo.z_coord) alc_pseudo_idx += 1 comp_ct.ffio.deletePseudos(del_pseudo) # Sets a structure-level property `constants.ALCHEMICAL_WATER_CHARGE` to # the desired value (+1 or -1). Eventually in the backend, this value # will become the `chargeB` of the alchemical water's oxygen atoms. alchem_charge = math.copysign(1, charge_diff) alchem_water_structure.property[constants.ALCHEMICAL_WATER_CHARGE] = \ alchem_charge # Puts the new structure right after the FEP structures. alchem_water_structure.property[constants.CT_TYPE] = \ constants.CT_TYPE.VAL.ALCHEMICAL_WATER ct_index = max(cms_model.comp_ct.index(cms_model.ref_ct), cms_model.comp_ct.index(cms_model.mut_ct)) + 1 cms_model.comp_ct.insert(ct_index, alchem_water_structure) cms_model.synchronize_fsys_ct() cms_model.write(cms_fname)
[docs] @staticmethod def get_water_ct_indices(ct_list): water_ct_index = [] for i, ct in enumerate(ct_list): if ct.property[constants.CT_TYPE] == constants.CT_TYPE.VAL.SOLVENT: # If num component is 1, speed up evaluate_smarts by just # checking first molecule (MATSCI-12463) if ct.property.get(constants.NUM_COMPONENT) == 1: ct = ct.molecule[1].extractStructure() list_of_list = evaluate_smarts(ct, "[H]O[H]", True, 1000000000) atom = list(chain(*list_of_list)) if atom != []: if len(atom) != ct.atom_total: msg = ("ERROR: Failed to set force field parameters " "for solvent CT (%i), which contains mixture of " "water and nonwater molecules. %i does not match" " asl atom count using (SMARTS. \"[H]O[H]\")" % (ct.atom_total, len(atom))) print(msg) raise ValueError(msg) water_ct_index.append(i) return water_ct_index
[docs]class AssignLambdaScheduleError(Exception): pass
[docs]class AssignLambdaSchedule(cmj.StageBase): """ """ NAME = "assign_lambda_schedule" PARAM = cmj._create_param_when_needed([ """ DATA = { } VALIDATE = { } """, ])
[docs] def crunch(self) -> None: """ """ # Read conformation and compute electric dipole moments of the # original and perturbed systems. for pj in self.get_prejobs(): jobname, dir = self._get_jobname_and_dir(pj) if not os.path.isdir(dir): os.makedirs(dir) util.chdir(dir) input_cms = pj.output.struct_file() try: norm_dipole_per_atom = self._compute_norm_dipole_per_atom( input_cms) except AssignLambdaScheduleError: self._print( 'quiet', 'ERROR: The structure was not set up ' 'properly for alchemical FEP simulations') status = cmj.JobStatus.BACKEND_ERROR else: status = cmj.JobStatus.SUCCESS self._print('debug', f'norm_dipole_per_atom: {norm_dipole_per_atom}') new_job = copy.deepcopy(pj) new_job.stage = cmj.weakref.proxy(self) new_job.output = cmj.JobOutput() new_job.need_host = False new_job.status.set(status) new_job.output.set_struct_file(os.path.abspath(input_cms)) self.add_job(new_job) # Compute lambda schedule and update subsequent task stages. if status == cmj.JobStatus.SUCCESS: current_stage = self._NEXT_STAGE while current_stage: if current_stage.NAME in PREDEFINED_STAGE_FAMILY['md']: if current_stage.NAME == 'concatenate': for sim_param in current_stage.param.simulate: sim_stage = simulate.Simulate() sim_stage.param = sim_param if sim_stage.NAME in PREDEFINED_STAGE_FAMILY[ 'md']: self._update_fep_lambda( norm_dipole_per_atom, sim_stage) else: self._update_fep_lambda(norm_dipole_per_atom, current_stage) current_stage = current_stage._NEXT_STAGE
[docs] @staticmethod def get_n_vdw_and_charge_div(norm_dipole_per_atom: float, n_win: int) \ -> Tuple[int, float]: """Compute number of van der Waals windows and fraction of charges. Uses a "broken stick" function to determine the partition of the lambda windows into van der Waals and electrostatic interactions. """ x = norm_dipole_per_atom p1 = np.poly1d([-932.51920742, 94.84779172]) p2 = np.poly1d([-20.80999859, 69.48773835]) x0 = -(p1[0] - p2[0]) / (p1[1] - p2[1]) # Intersection point. alpha0 = p1[0] + p1[1] * x0 alpha1 = (p1[1] + p2[1]) / 2 alpha2 = -(p1[1] - p2[1]) / 2 percent_vdw = (alpha0 + alpha1 * (x - x0) + alpha2 * (x - x0) * np.sign(x - x0)) # The number of van der Waals windows (n_vdw) and the inverse of # the fraction of Coulomb windows (charge_div) are set so that # there always are at least two windows assigned to each # interaction. n_vdw = max(2, int(percent_vdw / 100 * n_win)) charge_div = n_vdw if n_win - n_vdw == 2 else n_win / (n_win - n_vdw) return n_vdw, charge_div
def _update_fep_lambda(self, norm_dipole_per_atom: Tuple[Optional[float], Optional[float]], target_stage: cmj.StageBase) -> None: """Change the lambda schedule for the stage `target_stage`. """ param = target_stage.param fep_lambda = param.fep.lambda_ scheme, n_win = config.parse_lambda(fep_lambda.val) self._print('debug', f'scheme = {scheme!r}\tn_win = {n_win}') if scheme not in {'default', 'charge'}: warnings.warn(f'Scheme {scheme!r} is currently unsupported') return FepSchedule = fep_schedule.get_fep_schedule_class(scheme=scheme) norm_dipole_per_atom_disappearing = norm_dipole_per_atom[0] if norm_dipole_per_atom_disappearing is not None: n_vdwA, charge_divA = self.get_n_vdw_and_charge_div( norm_dipole_per_atom_disappearing, n_win) self._print('debug', f'n_vdwA = {n_vdwA}\tcharge_divA = {charge_divA}') self._log( f'Assigning ({n_vdwA} out of {n_win}) lambda windows ' f'to van der Waals A interactions in stage {target_stage.NAME!r}' ) scheduleA = FepSchedule(n_win, charge_divA) else: scheduleA = FepSchedule(n_win) norm_dipole_per_atom_appearing = norm_dipole_per_atom[1] if norm_dipole_per_atom_appearing is not None: n_vdwB, charge_divB = self.get_n_vdw_and_charge_div( norm_dipole_per_atom_appearing, n_win) self._print('debug', f'n_vdwB = {n_vdwB}\tcharge_divB = {charge_divB}') self._log( f'Assigning ({n_vdwB} out of {n_win}) lambda windows ' f'to van der Waals B interactions in stage {target_stage.NAME!r}' ) scheduleB = FepSchedule(n_win, charge_divB) else: scheduleB = FepSchedule(n_win) param.fep.lambda_ = sea.Map() param.fep.lambda_.vdwA = sea.List(scheduleA.vdwA) param.fep.lambda_.vdwB = sea.List(scheduleB.vdwB) param.fep.lambda_.chargeA = sea.List(scheduleA.chargeA) param.fep.lambda_.chargeB = sea.List(scheduleB.chargeB) param.fep.lambda_.bondedA = sea.List(scheduleA.bondedA) param.fep.lambda_.bondedB = sea.List(scheduleB.bondedB) param.add_tag('setbyuser', propagate=True) def _compute_norm_dipole_per_atom(self, cms_fname: str) \ -> Tuple[Optional[float], Optional[float]]: from schrodinger.application.desmond.packages import analysis from schrodinger.application.desmond.packages import topo class Dipole(analysis.Dipole): """Electric dipole moment of the selected atoms.""" def __init__(self, msys_model, cms_model, aids, charges=None): super().__init__(msys_model, cms_model, aids) if charges: # msys_model.atom(gid).charge always returns zero for the # atoms belonging only to cms_model.mut_ct, so we replace # the zeroed charges by the appropriate values here. self._charges[:len(charges)] = charges def norm_dipole_per_atom(msys_model, cms_model, atoms, charges=None): dipole_analyzer = Dipole(msys_model, cms_model, atoms, charges) frames = [topo.DuckFrame(msys_model)] electric_dipole = analysis.analyze(frames, dipole_analyzer) return np.linalg.norm(electric_dipole) / len(atoms) msys_model, cms_model = topo.read_cms(cms_fname) fep_ct = cms_model.fep_ct if fep_ct is None: raise AssignLambdaScheduleError atommap = fep_ct.fepio.atommap atoms_disappearing = [e.ai for e in atommap if e.aj < 0] atoms_appearing = [e.aj for e in atommap if e.ai < 0] aids_disappearing = [ fsys_atom.index for fsys_atom, comp_atom, comp_ct, ct_index in topo.cms_atom( cms_model) if comp_ct == cms_model.ref_ct and comp_atom.index in atoms_disappearing ] aids_appearing = [ fsys_atom.index for fsys_atom, comp_atom, comp_ct, ct_index in topo.cms_atom( cms_model) if comp_ct == cms_model.mut_ct and comp_atom.index in atoms_appearing ] if len(aids_disappearing) > 2: norm_dipole_per_atom_disappearing = norm_dipole_per_atom( msys_model, cms_model, aids_disappearing) else: norm_dipole_per_atom_disappearing = None if len(aids_appearing) > 2: site = fep_ct.ffio.site charges_appearing = [site[i].charge for i in atoms_appearing] norm_dipole_per_atom_appearing = norm_dipole_per_atom( msys_model, cms_model, aids_appearing, charges_appearing) else: norm_dipole_per_atom_appearing = None return norm_dipole_per_atom_disappearing, norm_dipole_per_atom_appearing
[docs]def encode_restraints(ct: structure.Structure, restraints: Restraints): """ Encode the restraints in the `FEP_ENCODED_RESTRAINTS` ct property. :param ct: Structure to modify in place. :param restraints: List of restraints as sea.Map objects. """ encoded_restraint = base64.b64encode(str(restraints).encode()) ct.property[FEP_ENCODED_RESTRAINTS] = encoded_restraint.decode()
[docs]def decode_restraints(ct: structure.Structure) -> Restraints: """ Decode the restraints in the `FEP_ENCODED_RESTRAINTS` ct property. :param ct: Structure to read the restraints from. :return: If found, restraints that can be passed to `schrodinger.application.desmond.packages.restraint.RestraintsBuilder`. Otherwise, return `None`. """ encoded_restraints = ct.property.get(FEP_ENCODED_RESTRAINTS) if encoded_restraints is not None: new_restraints = base64.b64decode(encoded_restraints).decode() return sea.Map("new = " + new_restraints).new
[docs]def clear_restraints(ct: structure.Structure): """ Clear the `FEP_ENCODED_RESTRAINTS` ct property and `FEP_RESTRAIN` atom property from a given structure. :param ct: Structure to modify in place. """ struc.delete_structure_properties(ct, FEP_ENCODED_RESTRAINTS) struc.delete_atom_properties(ct, FEP_RESTRAIN)
[docs]def add_restraint_reference(st: structure.Structure, restraints: Restraints): """ Add the reference values for Boresch type distance/angle/dihedral restraints. :param st: Reference structure. :param restraints: List of restraints, updated in place. """ # Can be given as aid or asl, so convert to aid if needed. convert_atom = lambda a: a if isinstance(a, int) else evaluate_asl( st, a.split('asl:')[-1].strip())[0] for restraint in restraints: atoms = restraint.atoms.val ref_value = st.measure(*[convert_atom(a) for a in atoms]) ref_name = '' if len(atoms) == 2: ref_name = 'r0' elif len(atoms) == 3: ref_name = 'theta0' elif len(atoms) == 4: ref_name = 'phi0' else: continue for ref_ext in 'AB': ref_key = f'{ref_name}{ref_ext}' if ref_key not in restraint: restraint[ref_key] = ref_value
[docs]def calculate_restraint_correction_term(restraints: Restraints, temperature: float) -> float: """ Calculate the correction to the free energy due to the restraints. :param restraints: List of restraints. :param temperature: The temperature for the simulation. :return: The correction term in kcal/mol. """ # Convert restraint format converted_restraints = [] for restraint in restraints: atoms = restraint.atoms.val k = max(restraint.force_constants.val) ref = None if len(atoms) == 2: if hasattr(restraint, 'r0A'): ref = restraint.r0A.val elif hasattr(restraint, 'r0'): ref = restraint.r0.val elif len(atoms) == 3: if hasattr(restraint, 'theta0A'): ref = restraint.theta0A.val elif hasattr(restraint, 'theta0'): ref = restraint.theta0.val elif len(atoms) == 4: if hasattr(restraint, 'phi0A'): ref = restraint.phi0A.val elif hasattr(restraint, 'phi0'): ref = restraint.phi0.val if ref is None: print( f'Could not find reference value for restraint {restraint}, skipping this term for the correction.' ) continue converted_restraints.append(fepana.Restraint(atoms, ref, k)) return fepana.calc_free_energy_for_abfe_cross_link_xu( converted_restraints, temperature)
[docs]def add_restraint_atom_marker(st: structure.Structure, ligand_asl: str) -> Dict[int, int]: """ Update `st` to mark the ligand atoms that could be part of a restraint. Returns a dictionary mapping the `FEP_RESTRAIN` values to the structure atom indicies. :param st: This will be updated in place to add atom properties to mark the ligand atoms. :param ligand_asl: ASL to identify the ligand. """ ligand_aids = sorted(evaluate_asl(st, ligand_asl)) restraint_to_atom_idx = {} for i, aid in enumerate(ligand_aids): # This is used to create restraints that are independent # of the absolute atom index. Number the ligand atoms # as if the ligand was by itself. st.atom[aid].property[FEP_RESTRAIN] = i + 1 restraint_to_atom_idx[i + 1] = aid return restraint_to_atom_idx
[docs]class LoadRestraintsFromStructure(cmj.StructureStageBase): """ Load the restraints encoded in the structure using `encode_restraints` and store to the cms. By default this stage will append to any existing restraints, set 'load_restraints_from_structure.existing = ignore' to ignore existing restraints in the structure. The restraints can be used by setting 'restrain.existing = retain' in the subsequent simulate stage. """ NAME = "load_restraints_from_structure" PARAM = cmj._create_param_when_needed([ """ DATA = { override = "" temperature = 300.0 existing = %s } VALIDATE = { override = {type = list size = 0} temperature = {type = float+} existing = {type = enum range = [%s]} } """ % (constants.EXISTING_RESTRAINT.RETAIN, ' '.join( constants.EXISTING_RESTRAINT)) ])
[docs] def run(self, jobname: str, cms_fname: str) -> Optional[str]: from schrodinger.application.desmond.packages.restraint import \ RestraintBuilder model = cms.Cms(file=cms_fname) has_restraints = any(decode_restraints(ct) for ct in model.comp_ct) for ct in model.comp_ct: new_restraints = decode_restraints(ct) if self.param.override.val: # Need to add restraint correction to same ct that has the # restraint. if has_restraints and new_restraints is None: continue new_restraints = self.param.override # Users can specify restraints without the reference # values set, so determine the values here. add_restraint_reference(model.fsys_ct, new_restraints) dg_correction = calculate_restraint_correction_term( new_restraints, self.param.temperature.val) ct.property[FEP_RESTRAINT_CORRECTION] = dg_correction self._print( "quiet", f"Updated restraint correction term: {dg_correction}") if new_restraints is not None: # This will update the model in place restraint_builder = RestraintBuilder(new_restraints, self.param.existing.val, model) restraint_builder.addRestraints() self._print( "quiet", restraint_builder.getString(skip_tables={'posre_harm'}, compact=True)) break out_fname = f"{jobname}-out.cms" model.write(out_fname) return out_fname
[docs]class ForcefieldBuilderLauncher(cmj.StageBase): NAME = "forcefield_builder_launcher" PARAM = cmj._create_param_when_needed([ """ DATA = { structure_file = "" additional_args = [] host = "localhost" output_opls_name = "$MAINJOBNAME-out.opls" } VALIDATE = { additional_args = {type = list size = 0} structure_file = {type = str1} host = {type = str1} output_opls_name = {type = str1} } """ ])
[docs] def crunch(self): self._print("debug", "In ForcefieldBuilderLauncher.crunch") for pj in self.get_prejobs(): jobname, jobdir = self._get_jobname_and_dir(pj) jobpath = Path(jobdir) new_job = cmj.Job(jobname, pj, self, None, jobdir) self.add_job(new_job) if not jobpath.exists(): jobpath.mkdir() # Set up input structure structure_path = Path(self.param.structure_file.val).absolute() util.chdir(jobdir) shutil.copyfile(structure_path, structure_path.name) # Set up command cmd = [ 'ffbuilder', '-HOST', 'localhost', '-SUBHOST', self.param.host.val, structure_path.name, '-JOBNAME', 'ffbuilder' ] + self.param.additional_args.val # Propagate input opls if oplsdir := os.getenv(mm.OPLS_DIR_ENV): cmd += ['-OPLSDIR', oplsdir] self._print("debug", f'Launching job: {cmd}') job = jobcontrol.launch_job(cmd) job.wait() # Copy back output files out_opls_path = Path('ffbuilder_oplsdir') if not job.succeeded(): self._print("quiet", "ERROR: forcefield_builder failed:") if not out_opls_path.exists(): self._print("quiet", f"Could not find output path {out_opls_path}.") if not job.succeeded(): self._print("quiet", "Job failed.") for log_fname in glob.glob('*.log'): self._print("quiet", f"Log file : {log_fname}") with open(log_fname, 'r') as f: log_contents = '>'.join(f.readlines()) self._print("quiet", f"Log file content:\n>{log_contents}") self._print("quiet", "(end of log file)\n") new_job.status.set(cmj.JobStatus.BACKEND_ERROR) return if out_opls_path.exists(): self._print( "quiet", "The following OPLSDIR will be used for all fep_plus " "calculations and copied back to the launch " f"directory: {self.param.output_opls_name.val}") opls_copy_path = Path(cmj.ENGINE.base_dir, self.param.output_opls_name.val) # Find the opls file in the output out_fname = next(out_opls_path.glob('*.opls')) shutil.copy(out_fname, opls_copy_path) self._files4copy.append(opls_copy_path) # Reset the OPLS_DIR env var os.environ[mm.OPLS_DIR_ENV] = str(opls_copy_path.absolute()) else: self._print( "quiet", "No additional parameters determined by the " "Force Field Builder workflow.") # Pass the output since this is the input for the next stage new_job.status.set(cmj.JobStatus.SUCCESS) new_job.output = copy.deepcopy(pj.output) return self._print("debug", "out ForcefieldBuilderLauncher.crunch")