Source code for schrodinger.pipeline.stages.prime

"""
A stage for running Prime jobs.

Copyright Schrodinger, LLC. All rights reserved.

"""
# Contributors: Matvey Adzhigirey

# Written for IFD (Ev:51619)

import os
import tarfile

import schrodinger.application.prime.input as primeinput
from schrodinger import structure
from schrodinger.pipeline import pipeio
from schrodinger.pipeline import pipeutils
from schrodinger.pipeline import stage


[docs]class PrimeStage(stage.Stage): """ Prime Active Site Optimization stage: Performs the familiar sidechain prediction plus minimization, used by IFD jobs. The residues used are those in RESIDUE_LIST. """
[docs] def __init__(self, *args, **kwargs): """ See class docstring. """ specs = """ PRIME_TYPE = option('SIDE_PRED', 'LOOP_BLD', 'REAL_MIN', 'SITE_OPT', 'ENERGY', 'COVALENT_DOCKING') THREADS = integer(default=None) MAX_JOBS = integer(default=None) NUM_OUTPUT_STRUCT = integer(default=None) ECUTOFF = float(default=None) USE_RANDOM_SEED = boolean(default=None) SEED = integer(default=None) USE_CRYSTAL_SYMMETRY = boolean(default=None) INT_DIEL = float(default=None) EXT_DIEL = float(default=None) USE_MAE_CHARGES = boolean(default=None) USE_MEMBRANE = boolean(default=None) SELECT = option('all', 'pick') RESIDUE_FILE = string(default=None) LIGAND = string(default=None) NPASSES = integer(default=None) LOOP_i_RES_j = string(default=None) RES_SPHERE = string(default=None) MAX_CA_MOVEMENT = float(default=None) MIN_OVERLAP = float(default=None) RESIDUE_i = string(default=None) INCLUDE_RESIDUE = boolean(default=None) """ stage.Stage.__init__(self, specs=specs, allow_extra_keywords=True, *args, **kwargs) # NOTE: allow extra keywords, because ALL of the stages keywords get # passed on to Prime, even if they are not in the specification. self.status = "NOT STARTED" self.jobdj = None self.prime_jobnames = [] # Will quit the program if this product is not installed self.requiredProduct('psp') # Used by Pipeline to associate -host_psp with this stage: self.setMainProduct('psp') # The input #1 needs to be of type 'structures' self.addExpectedInput(1, "structures", True) # The output #1 will be of type 'structures' self.addExpectedOutput(1, "structures", True)
[docs] def setupJobs(self): """ Sets up the Prime jobs, which are distributed via JobDJ. """ exe = os.path.join(os.environ['SCHRODINGER'], 'prime') # Create a JobDJ instance based on VSW parameters: self.jobdj = self.getJobDJ() self.prime_jobnames = [] # Get the filenames of ligand files specified in input #1: structure_files = self.getInput(1).getFiles() keywords = {key: value for key, value in self.items()} # Approximate job times (according to Tyler as of May 2011): # # "Unfortunately, some of these job types have huge variations in times. # SIDE_PRED can take 30 seconds to 2 hours, depending on how many side # chains the user selects. There's no default number, and typical use # cases vary, so any number would really be arbitrary. The same is true # for LOOP_BLD and REAL_MIN. SITE_OPT, if using the typical settings in # IFD, probably takes about 5 minutes. ENERGY is probably 30-60 # seconds, and COVALENT_DOCKING is probably 10 minutes." jobnum = 0 for ligfile in structure_files: for st in structure.StructureReader(ligfile): jobnum += 1 prime_jobname = self.genFileName(filenum=jobnum) # FIXME use compressed Maestro format for intermediate files. stfile = prime_jobname + '.mae' st.write(stfile) keywords['STRUCT_FILE'] = stfile p = primeinput.Prime(keywords) p.write(prime_jobname + '.inp') cmd = [exe, prime_jobname] self.jobdj.addJob(cmd) self.prime_jobnames.append(prime_jobname)
[docs] def processJobOutputs(self): """ Checks for failure of any subjobs (by reading the .log file). Renames the Prime output files. Raises a RuntimeError if any subjob failed. """ self.checkFiles(self.output_files) merged_output_file = self.genOutputFileName(1, filenum=1, extension=".mae") # FIXME use compressed Maestro format. writer = structure.StructureWriter(merged_output_file) st_num = 0 for subjob_outfile in self.output_files: # Prime may produce more than one structure per subjob: for st in structure.StructureReader(subjob_outfile): st_num += 1 writer.append(st) writer.close() self.checkFile(merged_output_file) # Set the output #1 of this stage to the list of renamed output files: self.setOutput(1, pipeio.Structures([merged_output_file], st_num))
[docs] def operate(self): """ Perform an operation on the input files. There are setup, running, and post-processing steps, and the stage records its current status so that it can be restarted in that step if there is a failure. Raises a RuntimeError if the JobDJ run() method fails, or if the stage finishes with an improper status. """ if self.status == "NOT STARTED" or self.status == "SETTING UP JOBS": self.status = "SETTING UP JOBS" # Setup JobDJ: self.setupJobs() # If restarting, go directly to next step. self.status = "RUNNING JOBS" self.dump() if self.status == "RUNNING JOBS": # Update JobDJ to correct options (may change if restarting): self.setJobDJOptions(self.jobdj) self.runJobDJ(self.jobdj) self.output_files = [] # Make sure that all Prime outputs are present: for prime_jobname in self.prime_jobnames: logfile = prime_jobname + '.log' outfile = prime_jobname + '-out.maegz' if os.path.isfile(outfile): # Output file exists: self.output_files.append(outfile) else: # Output poses file is missing: self.error("Prime output file is missing: %s" % outfile) if not os.path.isfile(logfile): msg = "Both output and log files are missing for job: %s" % prime_jobname raise RuntimeError(msg) # Output file is missing but log file is in place: # Parse the log file to see if Glide produced any # structures: no_poses_warning = False for line in pipeutils.BackwardsReader(logfile): if line == 'ERROR: No valid structure conformations were found.': no_poses_warning = True break if no_poses_warning: # No output poses msg = 'WARNING: Prime produced no output structures for job: %s\n' % prime_jobname self.warning(msg) continue # Go to the next job # Subjob failed without "no poses" message: msg = pipeutils.get_last_20_lines(logfile) errfile = prime_jobname + '.err' msg += pipeutils.get_last_20_lines(errfile) msg += "PRIME SUBJOB FAILED (no output file)\n" self.exit(msg) self.status = "PROCESSING FILES" self.dump() if self.status == "PROCESSING FILES": # Rename the Prime output files: self.processJobOutputs() self.status = "CLEANING UP" self.dump() if self.status == "CLEANING UP": if not self.getCleanupRequested(): self.status = "COMPLETE" return self.info("Cleaning up...") archive_files = [] for jobname in self.prime_jobnames: # Remove these files: for ext in [".mae", "-out.maegz", "-out.pdb"]: filename = jobname + ext if os.path.isfile(filename): os.remove(filename) # Archive these files: for ext in [".inp", ".log", ".err"]: filename = jobname + ext if os.path.isfile(filename): archive_files.append(filename) if archive_files: # There are files to archive: # Create an archive for log files: tar_file = self.genFileName(extension="_logs.tar.gz") log_tar = tarfile.open(tar_file, 'w:gz') for filename in archive_files: self.info(' Archiving %s...' % filename) log_tar.add(filename) # Close the archive file: log_tar.close() # Remove the files that were added to the archive: for filename in archive_files: os.remove(filename) self.status = "COMPLETE" self.dump() # COMPLETE return
[docs]class MMGBSAStage(stage.Stage): """ Stage for running Prime MM-GBSA on input structures. First structure in the input set is assumed to be the receptor. There are no keywords specific to the stage. The stage takes one input structure file set and generates one set of corresponding output structure files (also in PV format). """
[docs] def __init__(self, *args, **kwargs): """ See class docstring. """ specs = """ USE_MAE_CHARGES = boolean(default=False) # Whether to use atom partial charges in input Maestro file in simulations OUTPUT_LIG_STRAING = boolean(default=False) # Whether to output an estimate of the ligand strain energy USE_MEMBRANE = boolean(default=False) # Use Prime implicit membrane model in protein and complex simulations """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.jobdj = None self.status = "NOT STARTED" self.addExpectedInput(1, "structures", True) self.addExpectedOutput(1, "structures", True)
[docs] def mainProduct(self): """ Product string identifying the job type. Used for running different job types with different host sets. """ return 'prime'
[docs] def setupJobs(self): """ Sets up Prime MM-GBSA jobs, which are distributed via JobDJ. There will be one MMGBSA job for each input PV file. This method clears the working directory of previous input file symlinks, output files, and log files (if any). """ # NOTE: Since MMGBSA is run on XP output in VSW, there will be usually # only one input file, so only one master MMGBSA subjob will be # created. self.jobdj = self.getJobDJ() in1 = self.getInput(1) self.info('Counting number of structures...') self.input_st_num = in1.count() - 1 # Subtract receptor if self.input_st_num <= 0: raise RuntimeError('ERROR: No ligands present in the input set') # Prime-MMGBSA ideal job length range: min_job_size = 30 # about 1 hour (assuming 2 min/ligand) max_job_size = 500 # about 15 hours? self.info('Input number of structures (w/o receptor): %i' % self.input_st_num) # Determine the number of subjobs to split the into: (max_st_per_file, njobs, adjusted) = self.getAdjustedNJobs(self.input_st_num, min_job_size, max_job_size) if adjusted: self.info( "Adjusting number of jobs to yield %i-%i ligands per job" % (min_job_size, max_job_size)) self.mmgbsa_jobnames = [] st_in_file = 0 writer = None filenum = 0 receptor = None for infile in in1.getFiles(): # Ev:71197 and Ev:72410 for st in structure.StructureReader(infile): if not receptor: receptor = st continue if not writer or st_in_file == max_st_per_file: filenum += 1 jobname = self.genFileName(filenum=filenum) self.mmgbsa_jobnames.append(jobname) writer = structure.StructureWriter(jobname + '.mae') writer.append(receptor) st_in_file = 0 writer.append(st) st_in_file += 1 # Close the last file: writer.close() # FIXME use compressed Maestro format for intermediate files (in loops # above and below) for jobname in self.mmgbsa_jobnames: infile = jobname + '.mae' outfile = jobname + '-out.maegz' logfile = jobname + '.log' # Remove previous files: for f in [outfile, logfile]: if os.path.isfile(f): os.remove(f) # Run $SCHRODINGER/prime_mmgbsa: cmd = ['prime_mmgbsa', infile] cmd.append("-OVERWRITE") # Fix for Ev:129307 if self['USE_MAE_CHARGES']: cmd.append('-cmae') if self['OUTPUT_LIG_STRAING']: cmd.append('-lstrain') if self['USE_MEMBRANE']: cmd.append('-membrane') self.jobdj.addJob(cmd)
[docs] def processJobOutputs(self): """ Process the MMGBSA output. Returns with an error if any of the output files are missing. """ writer = structure.MultiFileStructureWriter(self.genOutputFileName(1), extension=".maegz") recep_written = False for jobname in self.mmgbsa_jobnames: logfile = jobname + '.log' outfile = jobname + '-out.maegz' if not os.path.isfile(logfile): self.exit("Missing MMGBSA log file: %s" % logfile) for line in open(logfile): if "FATAL ERROR" in line: self.exit(line) if not os.path.isfile(outfile): self.exit("Missing MMGBSA output file: %s" % outfile) st_num = 0 for st in structure.StructureReader(outfile): st_num += 1 if st_num == 1: if not recep_written: writer.append(st) recep_written = True else: # Not the receptor: writer.append(st) writer.close() output_obj = pipeio.Structures(writer.getFiles(), writer.getNumStructures()) self.setOutput(1, output_obj)
[docs] def operate(self): """ Perform an operation on the input files. There are setup, running, and post-processing steps, and the stage records its current status so that it can be restarted in that step if there is a failure. Raises a RuntimeError if the JobDJ run() method fails, or if the stage finishes with an improper status. """ if self.status == "NOT STARTED" or self.status == "SETTING UP JOBS": self.status = "SETTING UP JOBS" self.setupJobs() self._job_outputs = [] # If restarting, go directly to next step. self.status = "RUNNING JOBS" self.dump() if self.status == "RUNNING JOBS": # Update JobDJ to correct options (may change if restarting): self.setJobDJOptions(self.jobdj) self.runJobDJ(self.jobdj) # Make sure all outputs exist: for jobname in self.mmgbsa_jobnames: self.checkFile(jobname + '-out.maegz') self.status = "PROCESSING FILES" self.dump() if self.status == "PROCESSING FILES": self.processJobOutputs() return # COMPLETE return
# EOF