Source code for schrodinger.application.desmond.stage.launcher

import copy
import glob
import os
from collections import defaultdict
from itertools import chain
from pathlib import Path
from pathlib import PurePosixPath
from typing import TYPE_CHECKING
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union

from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import license as desmond_license
from schrodinger.application.desmond import membrane_utils
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger import structure
from schrodinger.utils import license
from schrodinger.utils import sea

if TYPE_CHECKING:
    from schrodinger.application.scisol.packages.fep import graph

__all__ = ['Multisim', 'FepLauncher', 'FepMembraneLauncher']

GRAPH_TAG = "GRAPH"


# TODO rename this class and its internal strings to Launcher/launcher
[docs]class Multisim(cmj.StageBase): """ This stage launches a set of subjobs for each job in _pre_jobs. The `job` param is a list of job commands, optionally containing sea macros. Each job command will be called with each input file as an additional argument. """ NAME = "multisim" RESTARTABLE = True PARAM = cmj._create_param_when_needed(""" DATA = { job = [] restart = {} compress = "" } VALIDATE = { job = {type = list size = 0} restart = {_skip = all} } """) RELAY_KEYS = ["RETRIES", "DEBUG", "debug", "verbose"] def _get_value(self, i: int, argv: sea.List) -> Union[str, List]: next_arg = argv[i + 1].val if isinstance(next_arg, str) and next_arg[0] == '-': raise ValueError(f"ERROR: No value for '{argv[i].val}' flag") self._skip_arg = True return next_arg def _get_relay_args(self) -> List[str]: relay_arg = [] for key, value in cmj.ENGINE.relay_arg.key_value(): if key in self.RELAY_KEYS: key = "-" + key if isinstance(value, list): for e in value: relay_arg.extend([ key, str(e.val), ]) elif isinstance(value.val, bool): relay_arg.append(key) else: relay_arg.extend([ key, str(value.val), ]) return relay_arg def _fail_if_license_not_available(self, lic: str) -> bool: """ Check for the existence of the license `lic`. If it is not available, pas a failed job and return True. :param lic: License to check. :return: True if the check failed. """ fail = False if not license.is_license_available(lic): self._print( "quiet", f"ERROR: The {license.LICENSE_NAMES.get(lic)} license is required to run this workflow." ) fail = True if desmond_license.has_model2_file() \ and not desmond_license.is_model2_server_available(): self._print( "quiet", "ERROR: The FEP+ Model 2 configuration file and access to the server is required to run this workflow." ) fail = True if fail: failed_job = cmj.Job('job', None, self, [], '.') failed_job.need_host = False failed_job.status.set(cmj.JobStatus.PERMANENT_LICENSE_FAILURE) self.add_job(failed_job) return True return False
[docs] def crunch(self): self._print("debug", f"In {self.__class__.__name__}.crunch") self._print("debug", f"relay_arg: {self._get_relay_args()}") new_jobs = self._crunch_prejobs(self.get_prejobs()) if new_jobs: self.add_jobs(new_jobs) self._print("debug", f"Out {self.__class__.__name__}.crunch")
def _crunch_prejobs(self, pre_jobs): new_jobs = [] for pj in pre_jobs: orig_jobname, jobdir = self._get_jobname_and_dir(pj) if not os.path.isdir(jobdir): os.makedirs(jobdir) util.chdir(jobdir) fname_in = pj.output.struct_file() for job_idx, job_arg_list in enumerate(self.param.job): jobname = self._gen_unique_jobname(orig_jobname) cmd, jobname = self._make_new_cmd(job_arg_list, jobname, fname_in) self._print("debug", f"cmd: {cmd}") new_jobs.append( self._make_new_job(jobname, pj, cmd, jobdir, fname_in, job_idx)) return new_jobs def _make_new_cmd(self, job_arg_list: sea.List, jobname: str, fname_in: str) -> Tuple[List[str], str]: cmd, job_arg_list = self._get_cmd_base(job_arg_list) for arg in job_arg_list: if isinstance(arg, sea.Map): with open(jobname + ".cfg", "w") as fh: fh.write(str(arg)) cmd.append(jobname + ".cfg") elif isinstance(arg, sea.List): with open(jobname + ".msj", "w") as fh: for name, stage in zip(arg[::2], arg[1::2]): fh.write(f"{name} {{\n{stage.__str__(' ')}}}\n\n") cmd.append(jobname + ".msj") else: cmd.append(str(arg.val)) cmd.extend(["-ALT_DIR", cmj.ENGINE.base_dir]) cmd.append(fname_in) jobname = cmd[cmd.index("-JOBNAME") + 1] return cmd, jobname def _get_cmd_base(self, job_arg_list: sea.List) -> Tuple[List[str], sea.List]: if (isinstance(job_arg_list[0].val, str) and job_arg_list[0].val.startswith("$SCHRODINGER/")): cmd = [job_arg_list[0].val] job_arg_list = job_arg_list[1:] else: cmd = ["$SCHRODINGER/utilities/multisim"] cmd += self._get_relay_args() return cmd, job_arg_list def _make_new_job(self, jobname: str, pj: cmj.Job, cmd: List[str], jobdir: str, fname_in: str, job_idx: int) -> cmj.Job: """ :param job_idx: The index of the job in the input job/dispatch param """ new_job = cmj.Job( jobname, pj, self, cmd, jobdir, what=f"multisim subjob: {fname_in} #{job_idx :2d}", err_handler=cmj.JobErrorHandler.restart_for_backend_error) new_job.output.add(os.path.abspath(jobname + "_multisim.log")) output_struct = None for e in reversed(cmd): if e == "-o": break output_struct = e if output_struct: new_job.output.add(os.path.join(jobdir, output_struct)) return new_job
[docs] def restart_subjobs(self, jobs: List[cmj.Job]): for job in jobs: self._get_jobname_and_dir(job.parent) job.status.set(cmj.JobStatus.WAITING) job.err_handler = cmj.JobErrorHandler.restart_for_backend_error util.chdir(job.dir) cmd, jobname = self._get_restart_job_cmd(job) self._print("debug", f"cmd: {cmd}") # Removes old output files. job.output = cmj.JobOutput() output_struct = None for e in reversed(cmd): if e == "-o": break output_struct = e job.jlaunch_cmd = cmd job.jobname = jobname job.output.add(os.path.abspath(jobname + "_multisim.log")) if output_struct: job.output.add(os.path.join(job.dir, output_struct)) self.add_jobs(jobs)
def _get_restart_job_cmd(self, job: cmj.Job) -> Tuple[List[str], str]: orig_jobname = job.jobname _, job_arg_list = self._get_job_idx_and_args(job) restart_stage_number = self._get_restart_stage_number(job) if restart_stage_number != "from-scratch": cpt_fname = orig_jobname + "-multisim_checkpoint" if os.path.isfile(cpt_fname): return self._get_restart_from_checkpoint_cmd( job_arg_list, orig_jobname, cpt_fname, restart_stage_number) else: self._print("quiet", "WARNING: Multisim checkpoint file not found.") self._print("quiet", f"WARNING: dir : {job.dir}") self._print("quiet", f"WARNING: file: {cpt_fname}") self._print("quiet", "WARNING: Will restart this job from scratch.") return self._make_new_cmd(job_arg_list, orig_jobname, job.parent.output.struct_file()) def _get_job_idx_and_args(self, job: cmj.Job) -> Tuple[int, sea.List]: job_idx = int( job.what.split('#')[-1]) # corresponds to job_idx in _make_new_job job_arg_list = self.param.job[job_idx] return job_idx, job_arg_list def _get_restart_stage_number(self, job: cmj.Job) -> Optional[int]: # First try {jobname = restart_number} format for k, v in self.param.restart.val.items(): if k == job.jobname: restart_stage_number = v break else: # otherwise try {protocol_name = [restart_numbers]} format job_idx, _ = self._get_job_idx_and_args(job) simulation_protocol_name = getattr(job, "simulation_protocol_name", "default") try: restart_stage_number = \ self.param.restart[simulation_protocol_name][ job_idx].val except (IndexError, KeyError): self._print( "quiet", f"WARNING: Stage number not specified for restarting {str(job)}" ) restart_stage_number = None return restart_stage_number def _get_restart_from_checkpoint_cmd( self, job_arg_list: sea.List, jobname: str, cpt_fname: str, restart_stage_number: int) -> Tuple[List[str], str]: cmd, job_arg_list = self._get_cmd_base(job_arg_list) cmd += [ "-RESTART", cpt_fname + (f":{restart_stage_number}" if restart_stage_number else ""), ] for fname in glob.glob(f"{jobname}*-out.tgz"): cmd += ["-d", fname] cmd += [str(arg.val) for arg in job_arg_list] cmd += ["-ALT_DIR", cmj.ENGINE.base_dir] jobname = cmd[cmd.index("-JOBNAME") + 1] return cmd, jobname
[docs] def collect_inputfile( self, job_arg_lists: Optional[sea.List] = None) -> List[str]: """ Returns a list of input fnames for the given jobs. If jobs is passed in, use it instead of the job associated with this object. This is used for subclasses to override the jobs list used. :param jobs: List of jobs to include. :return: The filenames needed to run this stage. """ all_fnames = [] job_arg_lists = job_arg_lists or self.param.job for job_arg_list in job_arg_lists: job_fnames = { "-ADD_FILE": [], "-d": [], "-set": [], } self._skip_arg = False for i, arg in enumerate(job_arg_list): if self._skip_arg: self._skip_arg = False elif arg.val in ["-RESTART", "-r"]: job_fnames["-RESTART"] = self._get_value(i, job_arg_list) elif arg.val == "-c": job_fnames["-c"] = self._get_value(i, job_arg_list) elif arg.val == "-m": job_fnames["-m"] = self._get_value(i, job_arg_list) elif arg.val == "-d": job_fnames["-d"].append(self._get_value(i, job_arg_list)) elif arg.val == "-ADD_FILE": job_fnames["-ADD_FILE"].append( self._get_value(i, job_arg_list)) for fname in job_fnames.values(): if isinstance(fname, list): all_fnames.extend(fname) else: all_fnames.append(fname) return all_fnames
def _check_partial_success(self): failed_jobs = self.filter_jobs(failed=[True], old=[False]) successful_jobs = self.filter_jobs(status=[cmj.JobStatus.SUCCESS], old=[False]) if not successful_jobs: return False # Some jobs were successful self._print( "quiet", f"\nStage {self._INDEX} partially completed. " f"{len(failed_jobs)} subjobs failed, " f"{len(successful_jobs)} subjobs done.\n") return True
[docs]class FepLauncher(Multisim): """ This stage launches a set of subjobs for each job in _pre_jobs. The `dispatch` param is a map of simulation protocols to a list of job commands, optionally containing sea macros. The edge matching the structure file for each pre-job is used to determine which list of job commands gets used. Then, each job command from this list will be called with that input file as an additional argument. """ NAME = "fep_launcher" PARAM = cmj._create_param_when_needed(""" DATA = { dispatch = {} compress = "" skip_legs = [] } VALIDATE = { dispatch = { _skip = all } skip_legs = {type = list size = 0} } """) def _crunch_prejobs(self, pre_jobs): if self._fail_if_license_not_available(license.FEP_GPGPU): return from schrodinger.application.scisol.packages.fep.graph import Graph new_jobs = [] g = None for pj in pre_jobs: orig_jobname, dir = self._get_jobname_and_dir(pj) if not os.path.isdir(dir): os.makedirs(dir) util.chdir(dir) fname_in = pj.output.struct_file() fmp_fname = self._get_fmp_fname(pj) if g is None: g = Graph.deserialize(fmp_fname) edge = self._get_edge(pj, g) simulation_protocol_name = (edge.simulation_protocol or constants.SIMULATION_PROTOCOL.DEFAULT) try: protocol_arg_lists = self.param.dispatch[ simulation_protocol_name] except KeyError: self._print( "quiet", f"WARNING: '{simulation_protocol_name}' simulation protocol is not supported for this type of fep." ) continue new_jobs.extend( self._get_new_jobs( edge, orig_jobname=orig_jobname, fname_in=fname_in, jobdir=dir, simulation_protocol_name=simulation_protocol_name, protocol_arg_lists=protocol_arg_lists, parent=pj)) return new_jobs def _get_fmp_fname(self, pj: cmj.Job) -> str: return pj.output.get(GRAPH_TAG) def _get_edge(self, pj: cmj.Job, g: 'graph.Graph') -> 'graph.Edge': return self.get_edge_from_struct_file(pj.output.struct_file(), g)
[docs] def get_edge_from_struct_file(self, struct_file: Union[str, Path], g: 'graph.Graph'): for ct in struc.read_all_ct(struct_file): try: id0, id1 = ct.property[constants.MOLTYPE].split(":") break except (KeyError, ValueError): pass else: raise Exception( f"Cannot find the CT property '{constants.MOLTYPE}' to identify edge in the graph." ) return g.id2edge((id0, id1))
def _should_skip_leg_for_edge(self, edge: 'graph.Edge', leg: str) -> bool: """ Return True if the leg should be skipped for the given edge. """ try: # FIXME: DESMOND-10696: Need to handle fragment_linking workflow. # Example: How do we handle if "solvent_fragment_hydration" is finished? # (The current code checks for the existence of output mae file) # It's ok for now since all the legs (complex, solvent, hydration, ...) # must be finished to be considered "completed". dg = getattr(edge, f'{leg}_dg') if dg is not None: # Skip to run this leg since it already has dg value self._print("quiet", f"INFO: Skip completed {leg} leg for {edge} edge.") return True except AttributeError: pass return False def _get_new_jobs(self, edge: 'graph.Edge', orig_jobname: str, fname_in: str, jobdir: str, simulation_protocol_name: str, protocol_arg_lists: sea.List, parent: Optional[cmj.Job] = None) -> List[cmj.Job]: jobs = [] pj = parent for job_idx, job_arg_list in enumerate(protocol_arg_lists): msj_jobname = job_arg_list[job_arg_list.index('-JOBNAME') + 1].val leg_name = util.get_leg_name_from_jobname(msj_jobname) if leg_name in self.param.skip_legs.val: self._print("quiet", f"INFO: Skip {leg_name} leg for all edges.") continue if self._should_skip_leg_for_edge(edge, leg_name): continue jobname = self._gen_unique_jobname(orig_jobname) new_cmd, jobname = self._make_new_cmd(job_arg_list, jobname, fname_in) self._print("debug", f"cmd: {new_cmd}") new_job = self._make_new_job(jobname, pj, new_cmd, jobdir, fname_in, job_idx) new_job.simulation_protocol_name = simulation_protocol_name jobs.append(new_job) return jobs def _get_cmd_base(self, job_arg_list: sea.List) -> Tuple[List[str], sea.List]: base_cmd, job_arg_list = super()._get_cmd_base(job_arg_list) base_cmd += [ '-lic', desmond_license.fep_lic(desmond_license.get_host(job_arg_list.val)) ] return base_cmd, job_arg_list def _get_job_idx_and_args(self, job: cmj.Job) -> Tuple[int, sea.List]: simulation_protocol_name = getattr(job, "simulation_protocol_name", "default") sim_prt = self.param.dispatch[simulation_protocol_name] job_idx = int( job.what.split('#')[-1]) # corresponds to job_idx in _make_new_job job_arg_list = sim_prt[job_idx] return job_idx, job_arg_list
[docs] def collect_inputfile( self, job_arg_lists: Optional[sea.List] = None) -> List[str]: """ Returns a list of input fnames for the given jobs. If jobs is passed in, use it instead of the job associated with this object. This is used for subclasses to override the jobs list used. :param jobs: List of jobs to include. :return: The filenames needed to run this stage. """ job_arg_lists = copy.copy(job_arg_lists) or sea.List() for protocol_arg_lists in self.param.dispatch.values(): for job_arg_list in protocol_arg_lists: job_arg_lists.append(job_arg_list) return super().collect_inputfile(job_arg_lists)
[docs] def restart_subjobs(self, jobs: List[cmj.Job]): if self._fail_if_license_not_available(license.FEP_GPGPU): return super().restart_subjobs(jobs)
[docs] def restart_edges(self, edge_ids: List[str], sim_protocols: Optional[Dict[str, str]] = None): """ Restart the jobs for a given list of edges. Each edge_id is specified as '{from_short_id}_{to_short_id}'. This will create the jobs if missing from the list of existing jobs. :param sim_protocols: This will be used for reconstructing missing jobs (if any). """ to_be_restarted = defaultdict(set) job_temp = None for job in self.jobs: for edge_id in edge_ids: if edge_id in job.jobname: job.status.set(cmj.JobStatus.WAITING) to_be_restarted[edge_id].add(job) job_temp = job_temp or job break to_be_made = set(edge_ids) - set(to_be_restarted) if to_be_made: new_job_dict = self._make_new_launch_jobs(job_temp, to_be_made, sim_protocols) self.add_jobs(set(chain.from_iterable(new_job_dict.values())))
def _make_new_launch_jobs( self, job_temp: cmj.Job, edge_ids_to_remake: Set[str], sim_protocols: Dict[str, str]) -> Dict[str, List[cmj.Job]]: engine = cmj.ENGINE new_job_dict = dict() fname_temp = os.path.basename(job_temp.what.split('#')[0].strip()) current_macro = copy.deepcopy(sea.get_macro_dict()) # TODO: For backwards compatiblity, keep both for now sea.update_macro_dict({ '$MAINJOBNAME': engine.jobname, '$MASTERJOBNAME': engine.jobname }) for edge_id in edge_ids_to_remake: sea.update_macro_dict({'$JOBTAG': edge_id}) protocol_arg_lists = self.param.dispatch[sim_protocols[edge_id]] self._print( "quiet", f"Checkpoint file does not have simulations job for edge {edge_id}. " "Creating a new one.") # prepare input/output file paths fname_in = fname_temp.replace(job_temp.parent.tag, edge_id) fname_out = job_temp.parent.output.struct_file().replace( job_temp.parent.tag, edge_id) # relative path for the output file # Jobs only run on Linux so use PurePosixPath here path_out = Path(fname_out) fname_out = str(PurePosixPath(path_out.parent.name, path_out.name)) simulation_protocol_name = sim_protocols[edge_id] parent = copy.deepcopy(job_temp.parent) parent.tag = edge_id parent.output.remove(parent.output.struct_file()) parent.output.add(fname_out) new_jobs = self._get_new_jobs( None, orig_jobname=engine.jobname, fname_in=fname_in, jobdir=job_temp.dir, simulation_protocol_name=simulation_protocol_name, protocol_arg_lists=protocol_arg_lists, parent=parent) missing_checkpoints = [] launcher_subdir = Path(job_temp.dir).name for job in new_jobs: checkpoint_path = Path(launcher_subdir, f'{job.jobname}-multisim_checkpoint') if not checkpoint_path.exists(): missing_checkpoints.append(checkpoint_path) if missing_checkpoints: self._print( "quiet", f'WARNING: Could not find checkpoint files for edge ' f'{edge_id.replace("_", ":")}, ' 'extend will run WITHOUT this edge.') for missing_checkpoint in missing_checkpoints: self._print("quiet", f'Missing: {str(missing_checkpoint)}') continue new_job_dict[edge_id] = new_jobs sea.set_macro_dict(current_macro) return new_job_dict
[docs]class FepMembraneLauncher(Multisim): """ This stage launches a multisim job which first adds a lipid membrane to the input graph's protein receptor. The protein-membrane system and a representative ligand are then equilibrated by the membrane relaxation protocol. The relaxed protein-membrane-water CTs are then added back to the fmp for subsequent FEP simulations. """ NAME = "fep_membrane_launcher" INPUT_FNAME = 'input.mae' CMS_OUT_FNAME = 'relaxed.cms' PARAM = cmj._create_param_when_needed([ """ DATA = { align_asl = "%s" mp = [] } VALIDATE = { align_asl = {type = str1} mp = {type = list elem = {type = str}} } """ % constants.BACKBONE_CA_ASL ])
[docs] def crunch(self): """ If the input fmp doesnt already have a membrane, create a multisim job which will automate membrane creation and relaxation, otherwise, skip running a job in this stage. """ self._print("debug", "In FepMembraneLauncher.crunch") from schrodinger.application.scisol.packages.fep.graph import Graph pj0 = self.get_prejobs()[0] jobname, jobdir = self._get_jobname_and_dir(pj0) if not os.path.isdir(jobdir): os.makedirs(jobdir) util.chdir(jobdir) self.graph = Graph.deserialize(pj0.output.get(GRAPH_TAG)) new_pre_jobs = [] if self.graph.membrane_struc and self.graph.solvent_struc: self._print( "quiet", "Input map already contains environment " "structures (membrane and solvent). Skipping the " "equilibration FepMembraneLauncher stage.") new_job = cmj.Job(pj0.jobname, pj0, self, None, jobdir, is_output=False) new_job.status.set(cmj.JobStatus.SUCCESS) self.add_job(new_job) else: cts = membrane_utils.get_membrane_launcher_input(self.graph) struc.write_structures(cts, self.INPUT_FNAME) self._files4copy.append(os.path.abspath(self.INPUT_FNAME)) job = self.param.job[0] job.val = desmond_license.add_fep_lic(job.val) pj0.output.set_struct_file(self.INPUT_FNAME) new_pre_jobs.append(pj0) new_jobs = self._crunch_prejobs(new_pre_jobs) for job in new_jobs: job.is_output = False self.add_jobs(new_jobs) self._print("debug", "Out FepMembraneLauncher.crunch")
[docs] def hook_captured_successful_job(self, job): """ After equilibration is complete, collect and process its output, and place the results into the fmp archive. """ from schrodinger.application.scisol.packages.fep import fepmae self._print("debug", "In FepMembraneLauncher.hook_captured_successful_job") jobname, jobdir = self._get_jobname_and_dir(job) util.chdir(jobdir) if Path(self.CMS_OUT_FNAME).exists(): membrane_utils.align_and_update_graph(self.CMS_OUT_FNAME, self.graph, self.param.align_asl.val) self._print( 'quiet', 'Updating FMP archive with solvent, membrane ' 'and the updated receptor structures. The coordinates ' 'of all the ligands have been updated to match the ' 'membrane-relaxed receptor.') fmp_fname = jobname + '.fmp' self.graph.write(fmp_fname) self._files4copy.append(os.path.abspath(fmp_fname)) edge_mae_files = fepmae.write_fepmae(jobname, self.graph, skip_completed_edges=False, zobs=self.param.mp.val) self.add_jobs( create_fep_launcher_jobs(edge_mae_files, self, job, jobdir, fmp_fname)) self._print("debug", "Out FepMembraneLauncher.hook_captured_successful_job") # Add output files from stage to be copied. # Files generated by multisim subjob will be automatically copied. self._files4copy.extend(map(os.path.abspath, edge_mae_files))
[docs]def create_fep_launcher_jobs(edge_mae_files: List[str], stage: cmj.StageBase, parent_job: cmj.Job, job_dir: str, fmp_fname: str, tag: str = None) -> List[cmj.Job]: """ Create jobs for the FEP launcher stage using the input edge structures specified by edge_mae_files """ new_jobs = [] for fname in edge_mae_files: new_job = cmj.Job(None, parent_job, stage, None, job_dir) new_job.output.add(os.path.abspath(fname)) new_job.output.add(os.path.abspath(fmp_fname), tag=GRAPH_TAG) new_job.status.set(cmj.JobStatus.SUCCESS) new_job.tag = tag or fname[-19:-4] # get edge ID from file name new_jobs.append(new_job) return new_jobs
[docs]def get_edge_from_struct_file(struct_file: Union[str, Path], g: 'graph.Graph'): from schrodinger.application.scisol.packages.fep.utils import get_ligand_node # Only the ligand has the FEP_HASH_ID, not the receptor for ct in structure.StructureReader(struct_file): short_id = ct.property.get(constants.FEP_HASH_ID) if short_id: break else: raise ValueError("Could not find ct containing FEP_HASH_ID property") for edge in g.edges_iter(): if get_ligand_node(edge).short_id == short_id: return edge else: raise ValueError( f"Could not find edge corresponding to given job: {short_id}")