import copy
import os
import shutil
from pathlib import Path
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
from typing import Union
import numpy as np
from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import license as desmond_license
from schrodinger.application.desmond import solubility_utils
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger.application.desmond.stage.jobs import DesmondJob
from schrodinger.application.desmond.stage.launcher import Multisim, \
FepLauncher, create_fep_launcher_jobs, get_edge_from_struct_file
from schrodinger.application.desmond.stage.prepare import AssignCustomCharge
from schrodinger.utils import sea
from schrodinger.utils import subprocess
if TYPE_CHECKING:
from schrodinger.application.scisol.packages.fep import graph
__all__ = [
'GenerateSolubilityFepStructures', 'SolubilityMdLauncher',
'SolubilityFepLauncher', 'SolubilityFepAnalysis'
]
# ############################## SUBJOB STAGES #################################
[docs]class GenerateSolubilityFepStructures(cmj.StageBase):
"""
This analyzes the results of an MD simulation
containing a solvated disordered system and
sets up the structures that will be used for
sublimation and hydration fep.
If more than 10% of the molecules
are over 90% solvent exposed, this marks
the molecules as soluble and sublimation fep
should not be run. The first structure is the
baseline structure used for hydration fep.
The `num_sublimation_strucs` strucutres after it are for
sublimation fep.
"""
NAME = "generate_solubility_fep_structures"
TAG = "GENERATE_SOLUBILITY_FEP_STRUCTURES"
PARAM = cmj._create_param_when_needed("""
DATA = {
num_sublimation_strucs = %d
require_custom_charge = true
hydration_only = false
solvation_only = false
}
VALIDATE = {
num_sublimation_strucs = {type = int1}
require_custom_charge = {type = bool}
hydration_only = {type = bool}
solvation_only = {type = bool}
}
""" % (solubility_utils.NUM_SUBLIMATION_JOBS,))
[docs] def crunch(self):
self._print("debug", "In GenerateSolubilityFepStructures.crunch")
hydration_or_solvation_only = self.param.hydration_only.val or self.param.solvation_only.val
for pj in self.get_prejobs():
jobname, dir = self._get_jobname_and_dir(pj)
if not os.path.isdir(dir):
os.makedirs(dir)
util.chdir(dir)
new_job = DesmondJob(pj.task, pj.gid, pj.permanent_restrain,
pj.permanent_group, jobname, pj, self, None,
dir)
new_job.need_host = False
new_job.status.set(cmj.JobStatus.SUCCESS)
out_fname = f"{jobname}-out.mae"
solute_fname = pj.output.struct_file()
solute_ct = structure.Structure.read(solute_fname)
# Generate the baseline_ct from the custom charge stage if found
# otherwise extract the first molecule from the solute_ct
custom_charge_fname = None
ppj = pj
while ppj is not None:
if isinstance(ppj.stage, AssignCustomCharge):
custom_charge_fname = ppj.output.struct_file()
break
ppj = ppj.parent
if custom_charge_fname is not None:
baseline_ct = structure.Structure.read(custom_charge_fname)
elif self.param.require_custom_charge.val:
self._print(
"quiet",
f"ERROR: {self.NAME} failed: Could not find custom charge stage."
)
new_job.status.set(cmj.JobStatus.BACKEND_ERROR)
self.add_job(new_job)
continue
else:
self._print(
"quiet",
"No custom charges found, using simulation without custom charges."
)
# No custom charges, use the first molecule in the solute_ct
for mol in solute_ct.molecule:
baseline_ct = mol.extractStructure(copy_props=True)
break
# Use the reference coordinates if saved
baseline_ct = struc.get_reference_ct(baseline_ct)
# Setup for Hydration FEP
for atom in baseline_ct.atom:
atom.property[constants.FEP_ABSOLUTE_ENERGY] = 1
atom.property[constants.FEP_ABSOLUTE_LIGAND] = 1
exposure_list = solubility_utils.get_molecule_exposures(
baseline_ct, solute_ct)
if solubility_utils.is_soluble(
exposure_list) and not hydration_or_solvation_only:
self._print("quiet", "Molecule is soluble.")
# Mark structure as soluble
solubility_utils.set_structure_soluble(baseline_ct)
baseline_ct.property[
constants.
SOLUBILITY_SUBJOB] = constants.SOLUBILITY_SUBJOB.VAL.SOLUBLE
baseline_ct.write(out_fname)
# Pass to next stage
new_job.output.set_struct_file(os.path.abspath(out_fname))
self.add_job(new_job)
continue
# Prepare the structures for the next stage.
# Each ct has the amorphous solid in solvent,
# but a different ligand molecule marked for
# Absolute FEP calculations.
prepared_cts = solubility_utils.extract_most_exposed_solute_molecules(
solute_ct,
exposure_list,
extract_count=self.param.num_sublimation_strucs.val)
if len(prepared_cts) == 0 and not hydration_or_solvation_only:
self._print(
"quiet",
f"ERROR: {self.NAME} failed: Could not prepare molecules for"
" sublimation FEP, no solute molecules present.")
new_job.status.set(cmj.JobStatus.BACKEND_ERROR)
self.add_job(new_job)
continue
# Write out the prepared cts
with structure.StructureWriter(out_fname) as w:
baseline_ct.property[
constants.
SOLUBILITY_SUBJOB] = constants.SOLUBILITY_SUBJOB.VAL.HYDRATION
w.append(baseline_ct)
for ct in prepared_cts:
ct.property[
constants.
SOLUBILITY_SUBJOB] = constants.SOLUBILITY_SUBJOB.VAL.SUBLIMATION
w.append(ct)
# Pass to next stage
new_job.output.set_struct_file(os.path.abspath(out_fname))
self.add_job(new_job)
self._print("debug", "Out GenerateSolubilityFepStructures.crunch")
# ############################# MAIN JOB STAGES ##############################
[docs]class SolubilityMdLauncher(Multisim):
"""
Launches the solubility md workflow.
"""
NAME = "solubility_md_launcher"
PARAM = cmj._create_param_when_needed("""
DATA = {
md_job = []
compress = ""
input_graph_file = ""
}
VALIDATE = {
md_job = {type = list size = 0}
compress = {type = str}
input_graph_file = {type = str1}
}
""")
[docs] def crunch(self):
"""
Set up the simulation by extracting the structures for
each job.
"""
self._print("debug", "SolubilityMdLauncher::crunch")
for pj in self.get_prejobs():
fname = pj.output.struct_file()
self._setup_jobs(fname)
break
super(SolubilityMdLauncher, self).crunch()
for job in self.filter_jobs(old=[False]):
# Output jobs will be created in poststage
job.is_output = False
self._print("debug",
f"SolubilityMdLauncher::crunch:done {self.param.job}")
[docs] def restart_subjobs(self, jobs: List[cmj.Job]) -> None:
"""
Prepare the subjobs for restart.
"""
self._print("debug", "SolubilityMdLauncher::restart_subjobs")
fnames = set()
for job in jobs:
fnames.add(job.parent.output.struct_file())
for fname in fnames:
# all subjobs have the same parent
self._setup_jobs(fname)
break
super(SolubilityMdLauncher, self).restart_subjobs(jobs)
self._print("debug", "SolubilityMdLauncher::restart_subjobs:done")
def _setup_jobs(self, fname_in: str):
"""
Setup the param.job list with the jobs to run.
"""
# Reset the list of jobs to run
self.param.job = sea.List()
cts = struc.read_all_structures(fname_in)
with structure.StructureWriter(fname_in) as w:
for ict, ct in enumerate(cts):
ct.property[
constants.
SOLUBILITY_SUBJOB] = constants.SOLUBILITY_SUBJOB.VAL.MD
ct.property[constants.CT_INDEX] = ict + 1
# DESMOND-9210: Clear absolute FEP properties which
# cause the job to fail.
for atom in ct.atom:
for prop_name in [
constants.FEP_ABSOLUTE_ENERGY,
constants.FEP_ABSOLUTE_LIGAND
]:
if prop_name in atom.property:
del atom.property[prop_name]
w.append(ct)
for ict, ct in enumerate(cts):
# Store the original title to be used later
job = copy.deepcopy(self.param.md_job)
ct_id = ct.property[constants.FEP_HASH_ID]
subjobname = f"$MAINJOBNAME_md_{ct_id}"
# Set the index for the correct structure
job.extend([
"-set",
f'stage[1].set_family.extract_structures.start_index={ict}',
"-set",
f'stage[1].set_family.extract_structures.end_index={ict+1}'
])
job.extend(["-o", subjobname + "-out.mae", "-JOBNAME", subjobname])
# DESMOND-8692: Use FEP license for Solubility jobs.
job.val = desmond_license.add_fep_lic(job.val)
self.param.job.append(job)
[docs] def poststage(self):
"""
For any md jobs that determined the ligand to be soluble, mark the
corresponding edge in the graph with is_soluble
"""
from schrodinger.application.scisol.packages.fep import graph
from schrodinger.application.scisol.packages.fep.utils import get_ligand_node
self._print("debug", "In SolubilityMdLauncher.poststage")
md_jobs = self.get_md_jobs()
_, jobdir = self._get_jobname_and_dir(md_jobs[0])
os.chdir(jobdir)
output_fmp_fname = self.param.input_graph_file.val
g = graph.Graph.deserialize(
Path(cmj.ENGINE.base_dir, self.param.input_graph_file.val))
for job in md_jobs:
st_file = job.output.struct_file()
output_struc = structure.Structure.read(st_file)
edge = get_edge_from_struct_file(st_file, g)
ligand_node = get_ligand_node(edge)
ligand_node.is_soluble = solubility_utils.is_structure_soluble(
output_struc)
self.add_jobs(
create_fep_launcher_jobs([st_file],
self,
job,
jobdir,
output_fmp_fname,
tag=ligand_node.short_id))
g.write(output_fmp_fname)
self._files4copy.append(os.path.abspath(output_fmp_fname))
self._print("debug", "Out SolubilityMdLauncher.poststage")
[docs] def get_md_jobs(self):
return self.filter_jobs(status=[cmj.JobStatus.SUCCESS],
old=[False],
is_for_jc=[True])
[docs]class SolubilityFepLauncher(FepLauncher):
"""
Launches the solubility fep workflow.
"""
NAME = "solubility_fep_launcher"
PARAM = cmj._create_param_when_needed("""
DATA = {
compress = ""
}
VALIDATE = {
compress = {type = str}
}
""")
[docs] def check_param(self):
ev = super().check_param()
if "input_graph_file" in self.param:
ev.record_error(
err="MSJ contains stages generated with an "
"outdated release. Please regenerate the files using "
"22-2 or later")
return ev
[docs] def get_edge_from_struct_file(self, struct_file: Union[str, Path],
g: 'graph.Graph'):
return get_edge_from_struct_file(struct_file, g)
def _should_skip_leg_for_edge(self, edge: 'graph.Edge', leg: str) -> bool:
"""
Return True if the leg should be skipped.
MD should only run if no fep legs have run
for expanding a map that already has dg values.
"""
from schrodinger.application.scisol.packages.fep.utils import get_ligand_node
if super()._should_skip_leg_for_edge(edge, leg):
return True
ligand_node = get_ligand_node(edge)
if ligand_node.is_soluble:
self._print(
"quiet",
f"Analysis of MD implies '{ligand_node.struc.title}' is "
f"soluble. Skipping sublimation and hydration FEP.")
# Soluble, skip FEP.
return True
return False
[docs]class SolubilityFepAnalysis(cmj.StageBase):
"""
This runs the solubility fep analysis.
"""
NAME = "solubility_fep_analysis"
TAG = "SOLUBILITY_FEP_ANALYSIS"
PARAM = cmj._create_param_when_needed("""
DATA = {
report = "$MAINJOBNAME_report.csv"
previous_report_file = ""
hydration_only = false
solvation_only = false
input_graph_file = ""
}
VALIDATE = {
report = {type = str}
previous_report_file = {type = str _check = multisim_file_exists}
hydration_only = {type = bool}
solvation_only = {type = bool}
input_graph_file = {type = str1}
}
""")
[docs] def crunch(self):
self._print("debug", "In SolubilityFepAnalysis.crunch")
# Nothing to do
if len(self.get_prejobs()) == 0:
self._print("debug", "Out SolubilityFepAnalysis.crunch")
return
# Set up output job
pj = self.get_prejobs()[0]
jobname, dir = self._get_jobname_and_dir(pj)
# The parents need to include all prejobs for restarting
# failed jobs to work properly (DESMOND-9985).
new_job = cmj.Job(jobname, pj, self, None, dir)
new_job.other_parent = self.get_prejobs()[1:]
new_job.need_host = False
new_job.status.set(cmj.JobStatus.SUCCESS)
analysis_failed = False
output_cts = []
report_csv = ""
def _update_ct_properties(ct, status):
for prop in ['s_chorus_trajectory_file', 's_m_original_cms_file']:
try:
del ct.property[prop]
except KeyError:
continue
ct.property[constants.SOLUBILITY_STATUS] = status
# Map from hash_id to a list of corresponding pre_jobs.
# unique is for restart/extended jobs which may duplicate
# the pre job.
hash_id_to_pre_job = {}
for pj in util.unique(self.get_prejobs()):
struc_fname = pj.output.struct_file()
ct = structure.Structure.read(struc_fname)
ct_id = ct.property[constants.FEP_HASH_ID]
jobs = hash_id_to_pre_job.get(ct_id, [])
jobs.append(pj)
hash_id_to_pre_job[ct_id] = jobs
# For each ligand
finished_compounds = set()
max_num_sublimation = 0
max_num_solvation = 0
for ct_id, pre_jobs in hash_id_to_pre_job.items():
# For each prejob associated with the ligand
is_soluble = False
hydration_ct = None
hydration_dg = None
solvation_dgs = []
sublimation_dgs = []
for pj in pre_jobs:
_, dir = self._get_jobname_and_dir(pj)
if not os.path.isdir(dir):
os.makedirs(dir)
util.chdir(dir)
struc_fname = pj.output.struct_file()
ct = structure.Structure.read(struc_fname)
ct.title = ct.property[constants.FEP_ORIGINAL_TITLE]
# Check if already marked as soluble
if solubility_utils.is_structure_soluble(ct):
self._print("quiet", f"Results for '{ct.title}'\n")
self._print(
"quiet", "Dissolution free energy: \n\t" +
f"{solubility_utils.SOLUBLE_MARKER}" + "\n")
report_csv += solubility_utils.get_report(ct.title,
is_soluble=True)
_update_ct_properties(
ct, constants.SOLUBILITY_STATUS.VAL.FINISHED)
output_cts.append(ct)
is_soluble = True
finished_compounds.add(ct.property[constants.FEP_HASH_ID])
break
cms_model = cms.Cms(struc_fname)
solute_ct = solubility_utils.extract_solute_ct(cms_model)
solute_ct.title = solute_ct.property[
constants.FEP_ORIGINAL_TITLE]
transfer_fene = solubility_utils.get_transfer_free_energy(
cms_model)
# Check for the free energy property
if transfer_fene is None:
analysis_failed = True
self._print(
"quiet",
f"ERROR: {self.NAME} failed: {constants.FEP_TRANSFER_FREE_ENERGY} not found in FEP result for {ct.title}."
)
_update_ct_properties(
ct, constants.SOLUBILITY_STATUS.VAL.FAILED)
output_cts.append(ct)
break
# Add to hydration or sublimation
if solute_ct.mol_total == 1:
if self.param.solvation_only.val:
solvation_dgs.append(transfer_fene)
hydration_ct = solute_ct
else:
if hydration_dg is not None:
analysis_failed = True
self._print(
"quiet",
f"ERROR: {self.NAME} failed: Duplicate hydration FEP result for {ct.title}."
)
_update_ct_properties(
ct, constants.SOLUBILITY_STATUS.VAL.FAILED)
output_cts.append(ct)
break
hydration_ct = solute_ct
hydration_dg = transfer_fene
else:
sublimation_dgs.append(transfer_fene)
if not is_soluble:
# Check for missing FEP results
missing_result_name = ""
if self.param.solvation_only.val:
if not len(solvation_dgs):
missing_result_name = "Solvation"
else:
if hydration_dg is None:
missing_result_name = "Hydration"
if len(sublimation_dgs
) == 0 and not self.param.hydration_only.val:
missing_result_name = "Sublimation"
if missing_result_name:
analysis_failed = True
self._print(
"quiet",
f"ERROR: {self.NAME} failed: {missing_result_name} FEP result was not found for {ct.title}."
)
_update_ct_properties(
ct, constants.SOLUBILITY_STATUS.VAL.FAILED)
output_cts.append(ct)
continue
incomplete_sublimation_jobs = solubility_utils.NUM_SUBLIMATION_JOBS - len(
sublimation_dgs)
if incomplete_sublimation_jobs and not (
self.param.hydration_only.val or
self.param.solvation_only.val):
self._print(
"quiet",
f"WARNING: {incomplete_sublimation_jobs} sublimation job(s) did not complete successfully. "
f"Continuing the calculation with the remaining {len(sublimation_dgs)} sublimation job(s)."
)
if self.param.hydration_only.val:
sublimation_dgs = []
elif self.param.solvation_only.val:
sublimation_dgs = []
solvation_dg = solubility_utils.bootstrapped_solvation_free_energy(
solvation_dgs)
else:
# Calculate dissolution free energy
median_sublimation_dg = solubility_utils.median_sublimation_free_energy(
sublimation_dgs)
dissolution_dg = solubility_utils.calculate_solubility_free_energy(
hydration_dg, median_sublimation_dg)
# Output structure with solubility property
hydration_ct.property[
constants.FEP_SOLUBILITY] = f"{dissolution_dg}"
kT = constants.BOLTZMANN * 300
solubility_uM = np.exp(-dissolution_dg.val / kT) * 1e6
hydration_ct.property[
constants.FEP_SOLUBILITY_MICROMOLAR] = solubility_uM
report_csv += solubility_utils.get_report(
ct.title, hydration_dg, sublimation_dgs, solvation_dgs)
_update_ct_properties(hydration_ct,
constants.SOLUBILITY_STATUS.VAL.FINISHED)
output_cts.append(hydration_ct)
finished_compounds.add(ct.property[constants.FEP_HASH_ID])
max_num_sublimation = max(max_num_sublimation, len(sublimation_dgs))
max_num_solvation = max(max_num_solvation, len(solvation_dgs))
self._print(
"debug",
f'Finished compounds in the current job: {finished_compounds}')
previous_compounds = set()
if self.param.previous_report_file.val:
report = Path(cmj.ENGINE.base_dir,
self.param.previous_report_file.val).read_text()
try:
for ligand in solubility_utils.parse_report(report).keys():
previous_compounds.add(util.str2hexid(ligand))
except ValueError:
# old format
self._print("debug", "Reading old report format...")
for line in report.split("\n"):
if line.startswith('Results for'):
ligand = line.strip().split()[-1][1:-1]
previous_compounds.add(util.str2hexid(ligand))
self._print(
"debug",
f'Previously complete compounds in the current job: {previous_compounds}'
)
new_compounds = finished_compounds - previous_compounds
self._print("debug",
f'New compounds in the current job: {new_compounds}')
self._print(
"quiet",
f'Number of new compounds completed in the current job: {len(new_compounds)}'
)
if len(new_compounds):
desmond_license.checkout_model2_compounds(
len(new_compounds), product=constants.PRODUCT.FEP)
report_basename = os.path.splitext(
os.path.basename(self.param.report.val))[0]
out_mae_fname = f'{report_basename}-out.mae'
with structure.StructureWriter(out_mae_fname) as w:
for ct in output_cts:
w.append(ct)
new_job.output.set_struct_file(os.path.abspath(out_mae_fname))
# Write out report
with open(self.param.report.val, 'w') as f:
if self.param.solvation_only.val:
f.write(
solubility_utils.get_header_solvation(max_num_solvation))
else:
f.write(solubility_utils.get_header(max_num_sublimation))
f.write(report_csv)
new_job.output.add(os.path.abspath(self.param.report.val))
# Copy files to root dir
out_mae_fname_copy = os.path.join(cmj.ENGINE.base_dir, out_mae_fname)
shutil.copy(out_mae_fname, out_mae_fname_copy)
self._files4copy.append(out_mae_fname_copy)
report_copy = os.path.join(cmj.ENGINE.base_dir, self.param.report.val)
shutil.copy(self.param.report.val, report_copy)
self._files4copy.append(report_copy)
util.chdir(cmj.ENGINE.base_dir)
jobname = cmj.ENGINE.jobname
checkpoint = f'{jobname}-multisim_checkpoint'
cmd = [
'run', '-FROM', 'scisol', "fep_mapper_cleanup.py", checkpoint,
'-force', '-fmp', self.param.input_graph_file.val, '-out-fmp',
f"{jobname}_out.fmp"
]
self._print("quiet", " ".join(cmd))
exit_status = subprocess.call(cmd)
if exit_status != 0:
analysis_failed = True
self._print("quiet",
"ERROR: Unable to complete fep_mapper_cleanup.py")
else:
self._files4copy.append(f"{jobname}_out.fmp")
self._files4copy.append(f"{jobname}_out.fmpdb")
if analysis_failed:
new_job.status.set(cmj.JobStatus.BACKEND_ERROR)
else:
new_job.status.set(cmj.JobStatus.SUCCESS)
self.add_job(new_job)
self._print("debug", "Out SolubilityFepAnalysis.crunch")