"""
A stage for running Prime jobs.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributors: Matvey Adzhigirey
# Written for IFD (Ev:51619)
import os
import tarfile
import schrodinger.application.prime.input as primeinput
from schrodinger import structure
from schrodinger.pipeline import pipeio
from schrodinger.pipeline import pipeutils
from schrodinger.pipeline import stage
[docs]class PrimeStage(stage.Stage):
"""
Prime Active Site Optimization stage: Performs the familiar sidechain prediction plus
minimization, used by IFD jobs. The residues used are those in RESIDUE_LIST.
"""
[docs] def __init__(self, *args, **kwargs):
"""
See class docstring.
"""
specs = """
PRIME_TYPE = option('SIDE_PRED', 'LOOP_BLD', 'REAL_MIN', 'SITE_OPT', 'ENERGY', 'COVALENT_DOCKING')
THREADS = integer(default=None)
MAX_JOBS = integer(default=None)
NUM_OUTPUT_STRUCT = integer(default=None)
ECUTOFF = float(default=None)
USE_RANDOM_SEED = boolean(default=None)
SEED = integer(default=None)
USE_CRYSTAL_SYMMETRY = boolean(default=None)
INT_DIEL = float(default=None)
EXT_DIEL = float(default=None)
USE_MAE_CHARGES = boolean(default=None)
USE_MEMBRANE = boolean(default=None)
SELECT = option('all', 'pick')
RESIDUE_FILE = string(default=None)
LIGAND = string(default=None)
NPASSES = integer(default=None)
LOOP_i_RES_j = string(default=None)
RES_SPHERE = string(default=None)
MAX_CA_MOVEMENT = float(default=None)
MIN_OVERLAP = float(default=None)
RESIDUE_i = string(default=None)
INCLUDE_RESIDUE = boolean(default=None)
"""
stage.Stage.__init__(self,
specs=specs,
allow_extra_keywords=True,
*args,
**kwargs)
# NOTE: allow extra keywords, because ALL of the stages keywords get
# passed on to Prime, even if they are not in the specification.
self.status = "NOT STARTED"
self.jobdj = None
self.prime_jobnames = []
# Will quit the program if this product is not installed
self.requiredProduct('psp')
# Used by Pipeline to associate -host_psp with this stage:
self.setMainProduct('psp')
# The input #1 needs to be of type 'structures'
self.addExpectedInput(1, "structures", True)
# The output #1 will be of type 'structures'
self.addExpectedOutput(1, "structures", True)
[docs] def setupJobs(self):
"""
Sets up the Prime jobs, which are distributed via JobDJ.
"""
exe = os.path.join(os.environ['SCHRODINGER'], 'prime')
# Create a JobDJ instance based on VSW parameters:
self.jobdj = self.getJobDJ()
self.prime_jobnames = []
# Get the filenames of ligand files specified in input #1:
structure_files = self.getInput(1).getFiles()
keywords = {key: value for key, value in self.items()}
# Approximate job times (according to Tyler as of May 2011):
#
# "Unfortunately, some of these job types have huge variations in times.
# SIDE_PRED can take 30 seconds to 2 hours, depending on how many side
# chains the user selects. There's no default number, and typical use
# cases vary, so any number would really be arbitrary. The same is true
# for LOOP_BLD and REAL_MIN. SITE_OPT, if using the typical settings in
# IFD, probably takes about 5 minutes. ENERGY is probably 30-60
# seconds, and COVALENT_DOCKING is probably 10 minutes."
jobnum = 0
for ligfile in structure_files:
for st in structure.StructureReader(ligfile):
jobnum += 1
prime_jobname = self.genFileName(filenum=jobnum)
# FIXME use compressed Maestro format for intermediate files.
stfile = prime_jobname + '.mae'
st.write(stfile)
keywords['STRUCT_FILE'] = stfile
p = primeinput.Prime(keywords)
p.write(prime_jobname + '.inp')
cmd = [exe, prime_jobname]
self.jobdj.addJob(cmd)
self.prime_jobnames.append(prime_jobname)
[docs] def processJobOutputs(self):
"""
Checks for failure of any subjobs (by reading the .log file). Renames
the Prime output files. Raises a RuntimeError if any subjob failed.
"""
self.checkFiles(self.output_files)
merged_output_file = self.genOutputFileName(1,
filenum=1,
extension=".mae")
# FIXME use compressed Maestro format.
writer = structure.StructureWriter(merged_output_file)
st_num = 0
for subjob_outfile in self.output_files:
# Prime may produce more than one structure per subjob:
for st in structure.StructureReader(subjob_outfile):
st_num += 1
writer.append(st)
writer.close()
self.checkFile(merged_output_file)
# Set the output #1 of this stage to the list of renamed output files:
self.setOutput(1, pipeio.Structures([merged_output_file], st_num))
[docs] def operate(self):
"""
Perform an operation on the input files. There are setup, running,
and post-processing steps, and the stage records its current status so
that it can be restarted in that step if there is a failure. Raises
a RuntimeError if the JobDJ run() method fails, or if the stage
finishes with an improper status.
"""
if self.status == "NOT STARTED" or self.status == "SETTING UP JOBS":
self.status = "SETTING UP JOBS"
# Setup JobDJ:
self.setupJobs()
# If restarting, go directly to next step.
self.status = "RUNNING JOBS"
self.dump()
if self.status == "RUNNING JOBS":
# Update JobDJ to correct options (may change if restarting):
self.setJobDJOptions(self.jobdj)
self.runJobDJ(self.jobdj)
self.output_files = []
# Make sure that all Prime outputs are present:
for prime_jobname in self.prime_jobnames:
logfile = prime_jobname + '.log'
outfile = prime_jobname + '-out.maegz'
if os.path.isfile(outfile): # Output file exists:
self.output_files.append(outfile)
else: # Output poses file is missing:
self.error("Prime output file is missing: %s" % outfile)
if not os.path.isfile(logfile):
msg = "Both output and log files are missing for job: %s" % prime_jobname
raise RuntimeError(msg)
# Output file is missing but log file is in place:
# Parse the log file to see if Glide produced any
# structures:
no_poses_warning = False
for line in pipeutils.BackwardsReader(logfile):
if line == 'ERROR: No valid structure conformations were found.':
no_poses_warning = True
break
if no_poses_warning: # No output poses
msg = 'WARNING: Prime produced no output structures for job: %s\n' % prime_jobname
self.warning(msg)
continue # Go to the next job
# Subjob failed without "no poses" message:
msg = pipeutils.get_last_20_lines(logfile)
errfile = prime_jobname + '.err'
msg += pipeutils.get_last_20_lines(errfile)
msg += "PRIME SUBJOB FAILED (no output file)\n"
self.exit(msg)
self.status = "PROCESSING FILES"
self.dump()
if self.status == "PROCESSING FILES":
# Rename the Prime output files:
self.processJobOutputs()
self.status = "CLEANING UP"
self.dump()
if self.status == "CLEANING UP":
if not self.getCleanupRequested():
self.status = "COMPLETE"
return
self.info("Cleaning up...")
archive_files = []
for jobname in self.prime_jobnames:
# Remove these files:
for ext in [".mae", "-out.maegz", "-out.pdb"]:
filename = jobname + ext
if os.path.isfile(filename):
os.remove(filename)
# Archive these files:
for ext in [".inp", ".log", ".err"]:
filename = jobname + ext
if os.path.isfile(filename):
archive_files.append(filename)
if archive_files: # There are files to archive:
# Create an archive for log files:
tar_file = self.genFileName(extension="_logs.tar.gz")
log_tar = tarfile.open(tar_file, 'w:gz')
for filename in archive_files:
self.info(' Archiving %s...' % filename)
log_tar.add(filename)
# Close the archive file:
log_tar.close()
# Remove the files that were added to the archive:
for filename in archive_files:
os.remove(filename)
self.status = "COMPLETE"
self.dump()
# COMPLETE
return
[docs]class MMGBSAStage(stage.Stage):
"""
Stage for running Prime MM-GBSA on input structures.
First structure in the input set is assumed to be the receptor.
There are no keywords specific to the stage.
The stage takes one input structure file set and generates one set of
corresponding output structure files (also in PV format).
"""
[docs] def __init__(self, *args, **kwargs):
"""
See class docstring.
"""
specs = """
USE_MAE_CHARGES = boolean(default=False) # Whether to use atom partial charges in input Maestro file in simulations
OUTPUT_LIG_STRAING = boolean(default=False) # Whether to output an estimate of the ligand strain energy
USE_MEMBRANE = boolean(default=False) # Use Prime implicit membrane model in protein and complex simulations
"""
stage.Stage.__init__(self, specs=specs, *args, **kwargs)
self.jobdj = None
self.status = "NOT STARTED"
self.addExpectedInput(1, "structures", True)
self.addExpectedOutput(1, "structures", True)
[docs] def mainProduct(self):
"""
Product string identifying the job type. Used for running different
job types with different host sets.
"""
return 'prime'
[docs] def setupJobs(self):
"""
Sets up Prime MM-GBSA jobs, which are distributed via JobDJ. There
will be one MMGBSA job for each input PV file. This method clears the
working directory of previous input file symlinks, output files, and
log files (if any).
"""
# NOTE: Since MMGBSA is run on XP output in VSW, there will be usually
# only one input file, so only one master MMGBSA subjob will be
# created.
self.jobdj = self.getJobDJ()
in1 = self.getInput(1)
self.info('Counting number of structures...')
self.input_st_num = in1.count() - 1 # Subtract receptor
if self.input_st_num <= 0:
raise RuntimeError('ERROR: No ligands present in the input set')
# Prime-MMGBSA ideal job length range:
min_job_size = 30 # about 1 hour (assuming 2 min/ligand)
max_job_size = 500 # about 15 hours?
self.info('Input number of structures (w/o receptor): %i' %
self.input_st_num)
# Determine the number of subjobs to split the into:
(max_st_per_file, njobs,
adjusted) = self.getAdjustedNJobs(self.input_st_num, min_job_size,
max_job_size)
if adjusted:
self.info(
"Adjusting number of jobs to yield %i-%i ligands per job" %
(min_job_size, max_job_size))
self.mmgbsa_jobnames = []
st_in_file = 0
writer = None
filenum = 0
receptor = None
for infile in in1.getFiles():
# Ev:71197 and Ev:72410
for st in structure.StructureReader(infile):
if not receptor:
receptor = st
continue
if not writer or st_in_file == max_st_per_file:
filenum += 1
jobname = self.genFileName(filenum=filenum)
self.mmgbsa_jobnames.append(jobname)
writer = structure.StructureWriter(jobname + '.mae')
writer.append(receptor)
st_in_file = 0
writer.append(st)
st_in_file += 1
# Close the last file:
writer.close()
# FIXME use compressed Maestro format for intermediate files (in loops
# above and below)
for jobname in self.mmgbsa_jobnames:
infile = jobname + '.mae'
outfile = jobname + '-out.maegz'
logfile = jobname + '.log'
# Remove previous files:
for f in [outfile, logfile]:
if os.path.isfile(f):
os.remove(f)
# Run $SCHRODINGER/prime_mmgbsa:
cmd = ['prime_mmgbsa', infile]
cmd.append("-OVERWRITE") # Fix for Ev:129307
if self['USE_MAE_CHARGES']:
cmd.append('-cmae')
if self['OUTPUT_LIG_STRAING']:
cmd.append('-lstrain')
if self['USE_MEMBRANE']:
cmd.append('-membrane')
self.jobdj.addJob(cmd)
[docs] def processJobOutputs(self):
"""
Process the MMGBSA output.
Returns with an error if any of the output files are missing.
"""
writer = structure.MultiFileStructureWriter(self.genOutputFileName(1),
extension=".maegz")
recep_written = False
for jobname in self.mmgbsa_jobnames:
logfile = jobname + '.log'
outfile = jobname + '-out.maegz'
if not os.path.isfile(logfile):
self.exit("Missing MMGBSA log file: %s" % logfile)
for line in open(logfile):
if "FATAL ERROR" in line:
self.exit(line)
if not os.path.isfile(outfile):
self.exit("Missing MMGBSA output file: %s" % outfile)
st_num = 0
for st in structure.StructureReader(outfile):
st_num += 1
if st_num == 1:
if not recep_written:
writer.append(st)
recep_written = True
else:
# Not the receptor:
writer.append(st)
writer.close()
output_obj = pipeio.Structures(writer.getFiles(),
writer.getNumStructures())
self.setOutput(1, output_obj)
[docs] def operate(self):
"""
Perform an operation on the input files. There are setup, running,
and post-processing steps, and the stage records its current status so
that it can be restarted in that step if there is a failure. Raises
a RuntimeError if the JobDJ run() method fails, or if the stage
finishes with an improper status.
"""
if self.status == "NOT STARTED" or self.status == "SETTING UP JOBS":
self.status = "SETTING UP JOBS"
self.setupJobs()
self._job_outputs = []
# If restarting, go directly to next step.
self.status = "RUNNING JOBS"
self.dump()
if self.status == "RUNNING JOBS":
# Update JobDJ to correct options (may change if restarting):
self.setJobDJOptions(self.jobdj)
self.runJobDJ(self.jobdj)
# Make sure all outputs exist:
for jobname in self.mmgbsa_jobnames:
self.checkFile(jobname + '-out.maegz')
self.status = "PROCESSING FILES"
self.dump()
if self.status == "PROCESSING FILES":
self.processJobOutputs()
return
# COMPLETE
return
# EOF