Source code for schrodinger.pipeline.stages.semiemp

"""
Stage for running semi-empirical jobs.

Input structure file - maestro file of one or more ligands (it does not
                       yet have an QM/MM mode for processing complexes).

Copyright Schrodinger, LLC. All rights reserved.

"""
# Contributors: Jeff Saunders

# Written for QPLD (Ev:85397).

import os

from schrodinger import structure
from schrodinger.pipeline import pipeio
from schrodinger.pipeline import pipeutils
from schrodinger.pipeline import stage
from schrodinger.utils import fileutils
from schrodinger.application.mopac.results71 import MOPAC71

STATUS_NOT_STARTED = "NOT STARTED"
STATUS_PREPARING_INPUTS = "PREPARING INPUT FILES"
STATUS_SETTING_UP = "SETTING UP JOBS"
# If restarting, go directly to next step.
STATUS_RUNNING_JOBS = "RUNNING JOBS"
STATUS_PROCESSING_FILES = "PROCESSING FILES"
STATUS_COMPLETE = "COMPLETE"


[docs]class SemiEmpStage(stage.Stage): """ Stage for running semi-empirical jobs. Input is a structure file of one or more ligands. """
[docs] def __init__(self, *args, **kwargs): """ Creates the stage instance, and passes the <args> and <kwargs> to the stage.Stage's constructor. """ specs = """ GEOPT = boolean(default=True) # Run geometry optimization METHOD = string(default="MNDO") # Semi-empirical method to use SEMIEMP_OPTS = string(default=None) # Additional keywords """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) # Input pin #1 must be of type "structures" and is required self.addExpectedInput(1, "structures", True) # Output pin #1 must be of type "structures" and is always produced self.addExpectedOutput(1, "structures", True) self.status = STATUS_NOT_STARTED self.jobdj = None self.semiemp_jobnames = []
[docs] def recombineInputLigands(self): """ Split/recombine the input ligand files into the desired number of subsets/subjobs. """ min_job_size = 100 max_job_size = 50000 # Get the file paths for the input pin #1: input_files = self.getInput(1).getFiles() # Get the number of structures and subjobs st_count = 0 for input_file in input_files: st_count += structure.count_structures(input_file) nst_per_file, njobs, adjusted = self.getAdjustedNJobs( st_count, min_job_size, max_job_size) self.info(" Number of structures: %i" % st_count) self.info(" Number of subjobs: %i" % njobs) if adjusted: self.info(" Adjusted to yield %i-%i ligands per job" % (min_job_size, max_job_size)) # Write the subjob structure files writer = structure.MultiFileStructureWriter(self.genFileName(), extension=".maegz", sts_per_file=nst_per_file) reader = structure.MultiFileStructureReader(input_files) for st_num, st in enumerate(reader, start=1): # Print a progress period every 1000 structures (about once every # few seconds) if st_num % 1000 == 0: self.lognoret(".") if st.property.get("b_glide_receptor"): # Skip receptors in PV files continue writer.append(st) writer.close() self.recombined_ligands = writer.getFiles() self.checkFiles(self.recombined_ligands)
[docs] def setupJobs(self): """ Sets up the semi-empirical jobs, which are distributed via JobDJ. """ # Create a JobDJ instance based on Pipeline parameters self.jobdj = self.getJobDJ() self.semiemp_jobnames = [] for filenum, ligfile in enumerate(self.recombined_ligands, start=1): semiemp_jobname = self.genFileName(filenum=filenum) # Make up the command string based on the keywords of this stage. # (This stage is an InputConfig (ConfigObj) instance) cmd = ["run", "semi_emp.py", f"-{MOPAC71}"] if not self['GEOPT']: cmd.append("-nogeopt") cmd.extend(("-method", self['METHOD'])) if self['SEMIEMP_OPTS']: cmd.extend(("-keywords", self['SEMIEMP_OPTS'])) cmd.append(ligfile) self.jobdj.addJob(cmd) self.semiemp_jobnames.append(semiemp_jobname)
[docs] def processJobOutputs(self): """ Renames the semi-empirical output files and does a final check for their existence. """ final_output_files = [] for filenum, semiemp_jobname in enumerate(self.semiemp_jobnames, start=1): # Have the backend write output in compressed format? outfile = semiemp_jobname + '_out.mae' # Find out what the output file should be called. renamed_outfile = self.genOutputFileName(1, filenum=filenum, extension=".mae") # Rename the SemiEmp output file. if os.path.exists(renamed_outfile): fileutils.force_remove(renamed_outfile) # For Win fileutils.force_rename(outfile, renamed_outfile) final_output_files.append(renamed_outfile) self.checkFiles(final_output_files) # Set the output #1 of this stage to the list of renamed output files: self.setOutput(1, pipeio.Structures(final_output_files))
[docs] def operate(self): """ The only overridden & required method in this class. Called by the Pipeline to run this stage's main code. If the stage has crashed, Pipeline will restart it from the last dump() point and call this method again. Perform the operation on the input files. There are preparing, setup, running, and post-processing steps, and the stage records its current status so that it can be restarted in that step if there is a failure. Raises a RuntimeError if the JobDJ run() method fails, or if the stage finishes with an improper status. """ if (self.status == STATUS_NOT_STARTED or self.status == STATUS_PREPARING_INPUTS): # Split the input into the desired number of subjobs self.status = STATUS_PREPARING_INPUTS self.recombineInputLigands() self.status = STATUS_SETTING_UP self.dump() if self.status == STATUS_SETTING_UP: # Setup JobDJ: self.setupJobs() # If restarting, go directly to next step. self.status = STATUS_RUNNING_JOBS self.dump() if self.status == STATUS_RUNNING_JOBS: # Update JobDJ to correct options (may change if restarting): self.setJobDJOptions(self.jobdj) self.runJobDJ(self.jobdj) # Make sure that all SemiEmp outputs are present. This must be done # within the STATUS_RUNNING_JOBS stage so the job can be restarted # in this stage if there's a failure. for semiemp_jobname in self.semiemp_jobnames: logfile = semiemp_jobname + '.log' # Have the backend write output in compressed format? outfile = semiemp_jobname + '_out.mae' # Unfortunately, there is an .out file for each input # structure, so we really can't check each effectively. if not os.path.isfile(logfile): msg = "ERROR: Semi-empirical subjob log file '%s' is missing\n" % logfile self.exit(msg) elif not os.path.isfile(outfile): msg = "ERROR: Semi-empirical subjob output structure file '%s' is missing\n" % outfile msg += pipeutils.get_last_20_lines(logfile) msg += "ERROR: Semi-empirical subjob output structure file is missing\n" self.exit(msg) self.status = STATUS_PROCESSING_FILES self.dump() if self.status == STATUS_PROCESSING_FILES: # Rename the SemiEmp output files: self.processJobOutputs() self.status = STATUS_COMPLETE self.dump() # If we reached at this point, then the stage has completed. return