Source code for schrodinger.application.desmond.stage.utils

import copy
import os
import pickle
import tempfile
import time
from itertools import repeat
from pathlib import Path
from types import SimpleNamespace
from typing import List
from typing import Optional
from typing import Union

import numpy as np

from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import fepana
from schrodinger.application.desmond import gconfig
from schrodinger.application.desmond import system_builder_inp as sbi
from schrodinger.application.desmond import util
from schrodinger.application.desmond.constants import RestrainTypes
from schrodinger.test import ioredirect
from schrodinger.utils import sea
from schrodinger.utils import subprocess
from schrodinger.utils.fileutils import TEMP
from schrodinger.utils.fileutils import get_directory_path

from .jobs import DesmondJob
from .jobs import FepJob


[docs]def check_cms_file(fname): """ """ try: fsys_ct = None comp_ct = [] struc_reader = structure.StructureReader(fname) for st_ in struc_reader: if (st_.property[constants.CT_TYPE] == constants.CT_TYPE.VAL.FULL_SYSTEM): if fsys_ct is not None: return f'More than one "{constants.CT_TYPE.VAL.FULL_SYSTEM}" CTs were found' fsys_ct = st_ else: comp_ct.append(st_) except KeyError: return f'One CT did not have the "{constants.CT_TYPE}" property' except: return "Reading the .cms file failed. The file might be corrupt" if fsys_ct is None: return f'"{constants.CT_TYPE.VAL.FULL_SYSTEM}" CT was not found' if not comp_ct: return "Component CT(s) was not found" return None
[docs]def check_box_size(model, msj_content, return_params=False): """ Check that the model box is big enough for desmond to run. :type model: cms.Cms or structure.Structure :param model: Desmond model or structure. For structure chorus properties must be set :param str msj_content: Contents of the simulate block (X in simulate {X}). Or for the concatenate: concatenate {simulate = [{X}]} Only some parameters are used to determine r_lazy: temp, ensemble, etc. see gconfig.msj2cfg :param bool return_params: If True, return also adjusted box and r_lazy :rtype: bool or tuple :return: True or False if box is big enough. If return_params=True, also return adjusted box size and r_lazy :raise ValueError: If msj is concatenate stage (checked with is_concat) """ from schrodinger.application.matsci.nano import xtal if not isinstance(model, cms.Cms): # Create a fake model from structure box = list(xtal.get_chorus_properties(model)) model = SimpleNamespace(box=box, comp_ct=[model]) # Get r_lazy msj = sea.Map(msj_content) if gconfig.is_concat(msj): raise ValueError('msj_content appears to be a concatenate stage, which ' 'this function does not support.') # Get default CFG msj_full = copy.deepcopy(gconfig.get_default_setting(msj)) # Ensure that there are only MD keys, required by gconfig.msj2cfg for key in msj.keys(): if key not in msj_full.keys(): msj.del_key(key) msj_full.update(msj) with ioredirect.IOSilence(): ret_cfg = gconfig.msj2cfg(msj_full, None, model) r_lazy = ret_cfg.force.nonbonded.r_lazy.val # Ported from desmond-gpu-src/base/src/engine/home.cxx :: configure_global_cell np_box = np.array(model.box).reshape((3, 3)) ibox = np.linalg.inv(np_box) inverse_unit_sphere = np.array([np.linalg.norm(x) for x in ibox.T]) # Ported from desmond-gpu-src/base/src/mdlib/near_terms/nobone.cxx :: estimate_grid_size adj_box_size = 1. / inverse_unit_sphere if any(r_lazy > x / 2. for x in adj_box_size): return (False, adj_box_size, r_lazy) if return_params else False return (True, adj_box_size, r_lazy) if return_params else True
[docs]def run_stage_method_in_parallel(method): """ Decorator to run a stage method in parallel. The first argument to the method must be a list, which will be split into `cmj.ENGINE.maxjob` jobs. If the first argument contains a single element, or maxjob is 1, then the method is called directly. The decorated method must return a list of results. It must NOT alter the stage state in any way, because the stage state is not propagated back. :raise RuntimeError: If the subprocess did not run successfully. """ def wrapped(self, input_list: List, *args, **kwargs) -> List: start_time = time.time() num_input = len(input_list) engine = cmj.ENGINE maxjob = engine.maxjob if num_input > 1 and maxjob > 1 and not os.getenv( 'SCHRODINGER_DESMOND_DISABLE_PARALLEL_CMS'): # Prepare input args list_idx = list(range(num_input)) batch_by_idx = np.array_split(list_idx, maxjob) batch_list = [ [input_list[j] for j in batch_by_idx[i]] for i in range(maxjob) ] input_args = { 'engine': engine.serialize_bytes(), 'stage_idx': self._INDEX, 'macro_dict': sea.get_macro_dict(), 'method': method.__name__, 'args': args, 'kwargs': kwargs, 'stage_attributes': self.__dict__, 'param': self.param, 'setbyuser_keys': self.param.keys(tag='setbyuser'), 'loglevel': cmj.GENERAL_LOGLEVEL, } cmd_base = [ f'{os.getenv("SCHRODINGER")}/run', str(Path(__file__).parent / 'run_method_parallel.py') ] with tempfile.TemporaryDirectory( dir=get_directory_path(TEMP)) as tmp_dir: # Loop through and run each batch procs = [] for i, batch in enumerate(filter(len, batch_list)): # Save input pkl input_args['batch'] = batch input_args['out_pkl'] = str( (Path(tmp_dir) / f'tmp_{i}-out.pkl').absolute()) pkl_fname = Path(tmp_dir) / f'tmp_{i}.pkl' log_fname = f'{pkl_fname}.log' with open(pkl_fname, 'wb') as f: pickle.dump(input_args, f) # Run the command cmd = cmd_base + [str(pkl_fname.absolute())] # PIPE will block when full causing the procs to run seriallly, # so use a file here instead. log_fh = open(log_fname, 'w') proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=log_fh) procs.append((proc, input_args['out_pkl'], log_fh)) # Collate the results results = [] for i, (proc, pkl_fname, log_fh) in enumerate(procs): proc.wait() # Close log and print contents log_fh.flush() log_fh.close() with open(log_fh.name, 'r') as f: print(f.read()) # Check the return code if proc.returncode != 0: raise RuntimeError(f'Error running {method.__name__}') with open(pkl_fname, 'rb') as f: results.extend(pickle.load(f)['results']) return results else: return method(self, input_list, *args, **kwargs) return wrapped
# this class includes static/class methods that are used elsewhere. Refactor # those and then deprecate this class
[docs]class SystemBuilder(cmj.StageBase): """ """ NAME = "system_builder" SOLVENT = { "SPC": "spc.box.mae", "SPCFW": "spcfw.box.mae", "TIP4P": "tip4p.box.mae", "TIP4PEW": "tip4pew.box.mae", "TIP3P": "tip3p.box.mae", "TIP5P": "tip5p.box.mae", "TIP4PD": "tip4pd.box.mae", "SPCE": "spce.box.mae", "TIP4P2005": "tip4p2005.box.mae" } SB_CMD = os.path.join(envir.CONST.SCHRODINGER_UTILITIES, "system_builder") PARAM = cmj._create_param_when_needed(""" DATA = { csb_file = "" distil_solute = true solvate_system = true neutralize_system = true assign_forcefield = true buffer_width = 10.0 ion_awayfrom = [] ion_awaydistance = 10.0 rezero_system = true minimize_volume = false box_shape = cubic solvent = SPC fepio_mode = 2 fep_retain_angle = yes restrain = none atom_group = retain } VALIDATE = { csb_file = {type = str _check = multisim_file_exists} distil_solute = [ {type = bool} {type = str } ] solvate_system = {type = bool} neutralize_system = {type = bool} assign_forcefield = {type = bool} buffer_width = [ {type = float range = [0.0 inf]} {type = list size = 3 elem = {type = float range = [0.0 inf]}} ] ion_awayfrom = {type = list size = 0 elem = {type = int1}} ion_awaydistance = {type = float range = [0.0 inf]} rezero_system = {type = bool} minimize_volume = {type = bool} box_shape = {type = enum range = [cubic orthorhombic]} solvent = {type = enum range = [SPC SPCFW TIP4P TIP4PEW TIP3P TIP5P TIP4PD SPCE TIP4P2005]} fepio_mode = {type = int range = [1 3]} fep_retain_angle = {type = bool} atom_group = [ {type = enum range = [retain none]} {atom = {type = str} name = {type = str} index = {type = int range = [0 255]} } {type = list size = 0 elem = {atom = {type = str} name = {type = str} index = {type = int range = [0 7]} } } ] restrain = [ {type = enum range = [retain none]} {_mapcheck = check_restrain _skip = all} {type = list size = -1 elem = {_mapcheck = check_restrain _skip = all} } ] } """)
[docs] @staticmethod def merge_restrain(model: cms.Cms, param: sea.Map, new_restr: List[cms.RestrainGroupContainer], old_restr: List[cms.RestrainGroupContainer]): """ Create restrain arrays/objects corresponding to the settings in `param` and merge them into `new_restr`. If the 'ref' or 'reference_pos' field of `param` equals 'retain', the reference positions from any matching restraints in `old_restr` are copied to the created restrain objects before they are merged into `new_restr`. :param model: the input cms model. It is not modified. :param param: The restrain parameters. 'atom', 'fc' or 'force_constant', 'ref' or 'reference_position', and 'sigma' are allowed :param new_restr: The collection of restrains that the new restrains will be merged into. :param old_restr: The collection of restrains to pull any corresponding reference positions from if 'ref' or 'reference_position' field == 'retain'. """ def update_ref(r, old_restrain, ref): if ref == 'retain': old_restrain.retain_ref_single(r) elif ref is not None: r.ref = ref def ensure_atoms_from_same_ct(*atom_idxs): n = len(atom_idxs) m = len(atom_idxs[0]) b = -1 for i in range(m): c = 0 for a in atom_idxs: if a[i] != []: c += 1 if c == n and b == -1: b = i elif c != 0: raise ValueError( "atoms of internal restraint are from different CTs") if b == -1: raise ValueError( "atoms of internal restraint are not specified") return b import schrodinger.structutils.measure as measure def peek(param_, name): return param_[name].val if name in param_ else None ref = peek(param, "ref") or peek(param, "reference_position") or None fc = peek(param, "fc") or param.force_constant.val sigma = peek(param, "sigma") num_atom = 1 if isinstance(param.atom, sea.Atom) else len(param.atom) if num_atom not in (1, 2, 3, 4): raise ValueError(f"length of setting setting restrain.atom" f" must be between 1 and 4 - value is " f"{num_atom}") if num_atom == 1: restrain_type = RestrainTypes.POS_FBHW if sigma else RestrainTypes.POS def make_array(atom_idxs, fc, ref): if sigma: return cms.FBHWPosRestrainArray(atom_idxs, fc, ref, sigma) else: return cms.PosRestrainArray(atom_idxs, fc, ref) # position fbhw or position harmonic restraints atom_idxs = model.select_atom_comp(param.atom.val if ( isinstance(param.atom, sea.Atom)) else param.atom[0].val) ct_idx = 1 if ref is None or ref in ['reset', 'retain']: for ct, restr_ct, old_restr_ct, ct_atom_idxs in zip( model.comp_ct, new_restr, old_restr, atom_idxs): xyz = ct.getXYZ( copy=False)[np.array(ct_atom_idxs, dtype=int) - 1] to_merge = make_array( list(zip(repeat(ct_idx), ct_atom_idxs)), fc, xyz) if ref == "retain": to_merge.retain_refs(old_restr_ct[restrain_type]) restr_ct.merge_group(restrain_type, to_merge) ct_idx += 1 else: for ct, restr_ct, ct_atom_idxs in zip(model.comp_ct, new_restr, atom_idxs): num_atoms = len(ct_atom_idxs) ref_arr = np.array(ref) if ref_arr.size < num_atoms * 3: # copy here or else we update the positions # initialize ref_2d with xyz in case ref_arr is not long # enough ref_2d = ct.getXYZ( copy=True)[np.array(ct_atom_idxs, dtype=int) - 1] # TODO can we fail here instead? Previous version # suppressed this error print( "WARNING: restrain reference array does not " "contain enough reference positions for the input," " using existing positions for remaining atoms.") else: ref_2d = np.empty(shape=(num_atoms, 3), dtype=float) ref_2d.flat[:ref_arr.size] = ref_arr to_merge = make_array( list(zip(repeat(ct_idx), ct_atom_idxs)), fc, ref_2d) restr_ct.merge_group(restrain_type, to_merge) ct_idx += 1 else: sigma = 0.0 if (sigma is None) else sigma def get_atoms_and_ct(param_: sea.Map): all_ais = [ model.select_atom_comp(atom.val) for atom in param_.atom ] ct_index = ensure_atoms_from_same_ct(all_ais[0], all_ais[1]) ais = [all_ai[ct_index] for all_ai in all_ais] return ct_index, model.comp_ct[ct_index], ais def prepend_ct_index(ct_index, atom_indices): return list(zip(repeat(ct_index), atom_indices)) if num_atom == 2: restrain_type = RestrainTypes.STRETCH_FBHW ct_index, ct, (i, j) = get_atoms_and_ct(param) # FIXME: calculate the center of mass of i and j, and calculate the distance between the two coms. ref_ = measure.measure_distance(ct.atom[i[0]], ct.atom[j[0]]) r = cms.Restrain([ prepend_ct_index(ct_index + 1, i), prepend_ct_index(ct_index + 1, j) ], fc, ref_, sigma) elif num_atom == 3: restrain_type = RestrainTypes.ANGLE_FBHW ct_index, ct, (i, j, k) = get_atoms_and_ct(param) ref_ = measure.measure_bond_angle(ct.atom[i[0]], ct.atom[j[0]], ct.atom[k[0]]) r = cms.Restrain( prepend_ct_index(ct_index + 1, [i[0], j[0], k[0]]), fc, ref_, sigma) else: restrain_type = RestrainTypes.IMPROPER_FBHW ct_index, ct, (i, j, k, l) = get_atoms_and_ct(param) ref_ = measure.measure_dihedral_angle(ct.atom[i[0]], ct.atom[j[0]], ct.atom[k[0]], ct.atom[l[0]]) r = cms.Restrain( prepend_ct_index(ct_index + 1, [i[0], j[0], k[0], l[0]]), fc, ref_, sigma) update_ref(r, old_restr[ct_index][restrain_type], ref) new_restr[ct_index].merge_single(restrain_type, r)
# For absolute binding FEP calculations. Key is fepid, and value is a list of # sea.Map objects CROSS_LINK = {}
[docs] @staticmethod def gen_restraint(model, setting, job): """ Generates restraints. """ if ("generator" in setting): if (setting.generator.val == "abfe_cross_link"): fepid = job.fepid if (fepid in SystemBuilder.CROSS_LINK): r = SystemBuilder.CROSS_LINK[fepid] else: if (not isinstance(job, FepJob)): raise RuntimeError( "ERROR: Attempted to create cross-link restraints for non-FEP jobs." ) model_fname = job.output.struct_file() traj_fname = job.output.get("TRJ") lig_atom = model.select_atom( f"atom.{constants.FEP_ABSOLUTE_LIGAND} 1") if (lig_atom == []): raise RuntimeError( "ERROR: Ligand molecule not found. Failed to set cross-link restraints\n" " for absolute binding FEP calculations.\n" f"NOTE: Is atom property '{constants.FEP_ABSOLUTE_LIGAND}' set to 1 for the ligand\n" " molecule?") r = fepana.get_abfep_cross_link(model, lig_atom, 4.0, traj_fname) try: k = setting.fc.val except AttributeError: try: k = setting.force_constant.val except AttributeError: raise RuntimeError( "ERROR: Force constants for abfe_cross_link restraints not set." ) if (3 != len(k)): raise RuntimeError( "ERROR: Force constants for abfe_cross_link restraints not set correctly." ) fc_prefactor = (job.n_win - job.i_win - 1) / (job.n_win - 1) ks, ka, kd = (fc_prefactor * k_i for k_i in k) restrain_params, restrains = [], [] def make_restr_sea(atom_ids: List, fc, ref_val): fsys_ct_1_based_index = 1 restrains.append( cms.Restrain( list(zip(repeat(fsys_ct_1_based_index), atom_ids)), fc, ref_val)) atom_str = " ".join(map(int, atom_ids)) restrain_params.append( sea.Map(f"atom = [{atom_str}] " f"fc = {fc:f} ref = {ref_val:f}")) # stretch make_restr_sea([r.A, r.a], ks, r.Aa[0]) # angle make_restr_sea([r.B, r.A, r.a], ka, r.BAa[0]) make_restr_sea([r.A, r.a, r.b], ka, r.Aab[0]) # torsion make_restr_sea([r.B, r.A, r.a, r.b], kd, r.BAab[0]) make_restr_sea([r.C, r.B, r.A, r.a], kd, r.CBAa[0]) make_restr_sea([r.A, r.a, r.b, r.c], kd, r.Aabc[0]) SystemBuilder.CROSS_LINK[fepid] = r job.cross_link = restrains return restrain_params elif (setting.generator.val == "alpha_helix"): try: helix_asl = setting.atom.val if (helix_asl[:4].lower() != "asl:"): helix_asl = "asl:" + helix_asl except AttributeError: helix_asl = 'asl:res.sec helix' try: k = setting.fc.val except AttributeError: k = [0.25, 1.5, 1.5] try: sigma = setting.sigma.val except AttributeError: sigma = ["?"] * 3 try: ref = setting.ref.val except AttributeError: ref = ["?"] * 3 cfg = sea.Map(("raw = [%s]" % cms.gen_alpha_helix_restraint( model, helix_asl, k, sigma, ref))) return [e for e in cfg.raw] else: return [setting]
[docs] @staticmethod def set_restrain( model: cms.Cms, setting: Union[sea.List, sea.Map], permanent_restr: Optional[List[cms.RestrainGroupContainer]] = None, job=None): """ Set ffio_restrain block in cms model given the 'restrain' setting and any existing permanent restraints :param model: the cms model to add restraints to :param setting: the 'restrain' setting, as a sea.Map. :param permanent_restr: any optional 'permanent' restrain that is passed along with the job object and persists throughout the workflow. It can :param job: the job object """ if (isinstance(setting, sea.Atom)): if (setting.val == "none"): model.clear_restrain() if (permanent_restr is not None): model.set_restrain(permanent_restr) elif (setting.val != "retain"): raise ValueError() else: old_restr = model.get_restrain() new_restr = ( [cms.RestrainGroupContainer() for _ in model.comp_ct] if (permanent_restr is None) else copy.deepcopy(permanent_restr)) model.clear_restrain() s = [] if (isinstance(setting, sea.Map)): s += SystemBuilder.gen_restraint(model, setting, job) elif (isinstance(setting, sea.List)): for e in setting: s += SystemBuilder.gen_restraint(model, e, job) for e in s: SystemBuilder.merge_restrain(model, e, new_restr, old_restr) model.set_restrain(new_restr) return model
[docs] @staticmethod def merge_atom_group(model, group_key): """ """ group_key = group_key if (isinstance(group_key, sea.List)) else [ group_key, ] for g in group_key: model.merge_atom_group_from_asl(g.atom.val, g.name.val, g.index.val)
[docs] @staticmethod def set_atom_group(model, group, permanent_group=None): """ """ if (isinstance(group, sea.Atom)): if (group.val == "none"): model.set_atom_group(permanent_group) elif (group.val != "retain"): raise ValueError() else: model.set_atom_group(permanent_group) if (isinstance(group, sea.Map)): SystemBuilder.merge_atom_group(model, group) elif (isinstance(group, sea.List)): for e in group: SystemBuilder.merge_atom_group(model, e) model.synchronize_fsys_ct() return model
[docs] def __init__(self, *arg, **kwarg): """ """ cmj.StageBase.__init__(self, *arg, **kwarg) # self._csb = sbi.SystemBuilderInput()
# __init__ def _set_csb(self): """ """ param = self.param csb = self._csb awaydist = param.ion_awaydistance.val def _csb_file(param): fname = param.val if (fname != ""): try: fh = open(fname, "r") s = fh.read() fh.close() except IOError: raise IOError("Reading the .csb file failed: %s" % fname) csb.read_text(s) def _buffer_width(param): bw = param if (isinstance(bw, sea.List)): csb.a, csb.b, csb.c = bw[0].val, bw[1].val, bw[2].val else: csb.a, csb.b, csb.c = bw.val, bw.val, bw.val self._set("csb_file", _csb_file) self._set("buffer_width", _buffer_width) self._set( "solvent", lambda param: csb.setSolvent(SystemBuilder.SOLVENT[param.val.upper( )])) self._set("solvate_system", lambda param: csb.setSolvate(param.val)) self._set("neutralize_system", lambda param: csb.setNeutralize(param.val)) self._set( "box_shape", lambda param: csb.setBoundaryCondition(boundary_condition=param.val, use_buffer=1)) self._set("ion_awayfrom", lambda param: csb.setExcludeLocation(param.val, awaydist)) # csb.setSoluteAlignment( "rezero", param.rezero_system.val ) csb.setMMFepioMapModeOption(param.fepio_mode.val) csb.setMMFepioRetainAngleOption((2 if (param.fep_retain_angle.val) else 1)) def _postbuild(self, job): """ """ fname = job.output.struct_file() model = cms.Cms(file=fname) model = SystemBuilder.set_atom_group(model, self.param.atom_group) setting = self.param.restrain if (isinstance(setting, sea.Atom) and setting.val == "retain"): model = SystemBuilder.set_restrain(model, sea.Atom("none"), job.permanent_restrain, job) else: model = SystemBuilder.set_restrain(model, setting, None, job) # when empty set permanent_restrain to None job.permanent_restrain = model.get_restrain() or None job.permanent_group = model.get_atom_group() if (job.permanent_group == []): job.permanent_group = None model.write(fname)
[docs] def crunch(self): """ """ self._print("debug", "In SystemBuilder.crunch") self._log("WARNING: system_builder does not " "generate forcefield anymore, please " "use build_geometry and assign_forcefield") if (self.param.assign_forcefield.val): self._set_csb() for pj in self._pre_job: jobname, dir = self._get_jobname_and_dir(pj) fname_in = jobname + "-in.mae" fname_in2 = jobname + "-in.csb" fname_out = jobname + "-out.cms" fname_log = jobname + "-in.log" if (not os.path.isdir(dir)): os.makedirs(dir) util.chdir(dir) output = pj.output.struct_file() util.symlink(output, fname_in) self._csb.setSolute(fname_in) self._csb.setWriteMaeFile(fname_out) # Separates solvent from the solute CT. distil_solute = self.param.distil_solute.val if (distil_solute): self._csb.fixSolute() if (isinstance(distil_solute, int)): self._csb.treatSolventFromSolute() else: if (distil_solute[:4].lower() == "asl:"): self._csb.treatSolventFromSolute( asl=distil_solute[4:]) else: raise ValueError( "ASL expression should start with 'asl:'") self._csb.write(fname_in2) # Constructs the command line for job-launching. jlaunch_cmd = [ SystemBuilder.SB_CMD, "-HOST", "localhost", fname_in2, ] if (self.param.minimize_volume.val): jlaunch_cmd.append("-minimize_volume") if (self.param.rezero_system.val): jlaunch_cmd.append("-rezero") self._print("debug", " %s" % subprocess.list2cmdline(jlaunch_cmd)) new_job = DesmondJob(pj.task, pj.gid, pj.permanent_restrain, pj.permanent_group, jobname, pj, self, jlaunch_cmd, dir) new_job.need_host = False new_job.host.name = "localhost" new_job.output.add(os.path.abspath(fname_out), checker=check_cms_file) new_job.output.add(os.path.abspath(fname_log)) self._job.append(new_job) else: self._pas_job += self._pre_job self._pre_job = [] self._print("debug", "Out SystemBuilder.crunch")
[docs] def hook_captured_successful_job(self, job): """ """ try: self._postbuild(job) except RuntimeError as e: self._print("quiet", e) return "dissolved"