Source code for schrodinger.pipeline.stages.ligprep

# python module
"""
Stages for running LigPrep.

    LigPrepStage - runs LigPrep on input structures
    PostLigPrepStage - limits stereos, removes penalized states

Copyright Schrodinger, LLC. All rights reserved.

"""

# Contributors: Matvey Adzhigirey

import gzip  # For compress()
import os
import shutil
import sys
import tarfile  # For cleanup

from schrodinger import structure
from schrodinger.infra import mm  # noqa: F401 For exceptions
from schrodinger.pipeline import compiled_functions
from schrodinger.pipeline import pipeio
from schrodinger.pipeline import pipeutils
from schrodinger.pipeline import stage
from schrodinger.utils import fileutils
from schrodinger.utils.fileutils import MAESTRO
from schrodinger.utils.fileutils import SD
from schrodinger.utils.fileutils import SMILES
from schrodinger.utils.fileutils import SMILESCSV


[docs]def compress(origFile): """ Compresses the specified file and removes the original """ if origFile.endswith('.mae'): compFile = origFile + 'gz' # To produce .maegz else: compFile = origFile + '.gz' orig_handle = open(origFile, 'r') gzip_handle = gzip.GzipFile(compFile, mode='w') for line in orig_handle: gzip_handle.write(line) orig_handle.close() gzip_handle.close() os.remove(origFile) return compFile
[docs]def replace_arg_value(cmd, arg, value, add=True): ''' Replaces value of a command line argument. :param cmd: List of command line arguments. :type cmd: list(str) :param arg: Argument name. :type arg: str :param value: Argument value. :type value: str :param add: Add the argument if not present? :type add: bool ''' try: idx = cmd.index(arg) except ValueError: # Argument is not present, add it: if add: cmd += [arg, value] else: if idx + 1 < len(cmd): # NOTE if arg is present in cmd and has old value, replace: cmd[idx + 1] = value
[docs]class LigPrepStage(stage.Stage): """ Pipeline stage for running LigPrep on the input structures """
[docs] def __init__(self, *args, **kwargs): specs = """ UNIQUEFIELD = string(default="NONE") # Field to identify unique compound by RETITLE = boolean(default=False) # Whether to set OUTCOMPOUNDFIELD OUTCOMPOUNDFIELD = string(default="s_m_title") # Field where to store the compound codes (if RETITLE is True). STEREO_SOURCE = string(default="parities") # parities/geometry USE_EPIK = boolean(default=False) # Whether to use Epik instead of Ionizer. METAL_BINDING = boolean(default=False) # Use Epik metal binding mode. RETAIN = boolean(default=True) # Retain input variant for each compound. PH = float(default=7.0) # Target pH PHT = float(default=2.0) # pH threshold IONIZE = boolean(default=True) # Whether to ionize (Ionizer; Epik always ionizes). GENERATE_TAUTOMERS = boolean(default=True) # Whether to generate tautomers. MAX_TAUTOMERS = integer(default=8) # Maximum number of tautomers to generate (Ionizer). NEUTRALIZE = boolean(default=False) # Whether to neutralize before expanding states. MAX_STATES = integer(default=16) # Maximum number of states to generate (Epik). NUM_STEREOISOMERS = integer(default=32) # Generate this many stereoisomers per protonation state. MAX_STEREOISOMERS = integer(default=None) # Keep this many low-energy stereoisomers per protonation state. NRINGCONFS = integer(default=1) # Ring conformers per ligand MIXLIGS = boolean(default=False) RECOMBINE = boolean(default=True) # Whether to recombine input ligand files COMBINEOUTS = boolean(default=False) # Combine output files SKIP_BAD_LIGANDS = boolean(default=True) REGULARIZE = boolean(default=False) # Whether to standardize input structures before preparing SKIP_NOUNIQUE_LIGANDS = boolean(default=False) # Whether to skip ligands that have no unique field. TAUT_SPEC_FILE = string(default=None) # Custom tautomerizer spec file to use. NORMALIZE = boolean(default=False) # Whether to normalize input variants OUTFORMAT = string(default="mae") """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.addExpectedInput(1, "structures", True) self.addExpectedOutput(1, "structures", True) self.setMainProduct('ligprep') self.requiredProduct('macromodel') self.input_files = [] self.recombined_files = [] self.ligprep_jobnames = [] self.output_files = [] self.jobdj = None self.status = "NOT STARTED" self.px = None # LIGPREP-1732
[docs] def initNonPersistent(self, pipeline): # value of the `px` attribute gets passed to # ligprep '-px' argument to skip ligprep license checkouts self.px = compiled_functions.pipeline_init_wrapper(pipeline.stagejobs)
[docs] def recombineInputLigands(self): """ Read all input structures, while optionally skipping bad structures, and save each structure into a subjob input file. Optionally structures may be retitled where the compound ID of a structure is determined either from UNIQUEFIELD property, or each structure is assumed to be its own compound (no variants). Structures can also be mixed within subjobs in order to make subjob lengths more uniform, if MIXLIGS is set to True. """ self.input_files = self.getInput(1).getFiles() if not self['RECOMBINE']: # No recombine (Ev:65662) for keyword in [ 'RETITLE', 'MIXLIGS', 'SKIP_BAD_LIGANDS', 'SKIP_NOUNIQUE_LIGANDS' ]: if self[keyword]: self.warning( "WARNING: RECOMBINE is False; ignoring %s keyword" % keyword) if self['UNIQUEFIELD'] != 's_m_title': self.warning( "WARNING: RECOMBINE is False; ignoring UNIQUEFIELD keyword") self.info("Will NOT recombine or retitle input ligands") self.recombined_files = self.getInput(1).getFiles() self.info("There will be %i subjobs." % len(self.recombined_files)) return # Here and down is RECOMBINATION: if hasattr(self, 'input_st_num'): # already counted # Already calculated self.info("Number of input structures was already calculated") else: # FIXME If input number of structures is know, use it self.info("Counting number of input structures") input_st_num = 0 for filename in self.input_files: try: input_st_num += structure.count_structures(filename) except: self.exit("ERROR Could not get st count for file: " + filename) self.input_st_num = input_st_num self.dump() # Save the instance self.info("Number of input structures: %i" % self.input_st_num) # These values are used to create ideal job size (if the Pipeline # driver requests it): min_job_size = 2000 # About 1 hour max_job_size = 20000 # About 10 hours # Determine the number of subjobs to split the ligand set into: # (NOTE: Adjustment is being done only if VSW driver requests it) (n_st_per_file, njobs, adjusted) = self.getAdjustedNJobs(self.input_st_num, min_job_size, max_job_size) if adjusted: self.info( "Adjusting number of jobs to yield %i-%i ligands per job" % (min_job_size, max_job_size)) uniquefield = self['UNIQUEFIELD'] if self['RETITLE']: if (uniquefield == "NONE"): self.info( "Each input structure will be treated as a unique compound." ) else: self.info("Field to identify unique compounds: " + uniquefield) mixligs = self['MIXLIGS'] if njobs == 1: mixligs = False self.info( "Separating ligands into %i subjobs (%i max ligands per job)." % (njobs, n_st_per_file)) if mixligs: self.info("Mixing ligands to uniformize subjob lengths") else: self.info("Not mixing ligands") self.info("\nRecombining ligand files...") recombined_writers_dict = {} sts_in_file = 0 # needed if not mixing filenum = 1 dp = pipeutils.DotPrinter(self.input_st_num) if self['REGULARIZE']: # Get a shared Canvas license required for 2D SD file generation: from schrodinger.application.canvas import utils utils.get_license(license_type=utils.LICENSE_SHARED) import schrodinger.structutils.smiles as smiles smiles_stereo = smiles.STEREO_FROM_ANNOTATION_AND_GEOM smiles_generator = smiles.SmilesGenerator(smiles_stereo, unique=True) num_skipped_sts = 0 st_num = 0 for inligfile in self.input_files: ok_sts_in_file = False iformat = fileutils.get_structure_file_format(inligfile) self.debug("\nReading file: %s (format %s)" % (inligfile, iformat)) # Will open unknown files as SD: sts = pipeutils.get_reader(inligfile, sd_for_unknown=True) while True: # Iterate until StopIteration: try: st = next(sts) except StopIteration: break # EOF, exit except Exception: # Bad structure if self['SKIP_BAD_LIGANDS']: num_skipped_sts += 1 st_num += 1 continue # Skip the bad structure else: msg = "ERROR: Could not read next structure from file: %s" % inligfile msg += "Try setting SKIP_BAD_LIGANDS to True" self.exit(msg) else: # Read OK st_num += 1 dp.dot() if iformat in [SMILES, SMILESCSV]: # Convert SMILES pattern to a regular Structure object: try: st = st.get2dStructure() except Exception as err: if self['SKIP_BAD_LIGANDS']: self.debug(str(err)) # full error message self.warning( 'WARNING: Structure %i: Could not read SMILES "%s". Skipping.' % (st_num, st.smiles)) num_skipped_sts += 1 st_num += 1 continue # Skip the bad structure else: self.warning(str(err)) # full error message self.exit( 'ERROR: Structure %i: Could not read SMILES "%s". Exiting.' % (st_num, st.smiles)) # Normalize by converting to a unique SMILES string first: if self['REGULARIZE']: # NOTE: If input is SMILES-formatted, this will result in SMILES->CT->SMILES->CT conversion # FIXME Perhaps instead we should simply uniquify the # SMILES string and then convert to CT once? try: pattern = smiles_generator.getSmiles(st) except Exception: if self['SKIP_BAD_LIGANDS']: self.warning( "WARNING: Structure %i: Could not generate SMILES (for regularization)" % st_num) num_skipped_sts += 1 st_num += 1 continue # Skip the bad structure else: msg = "ERROR: Structure %i: Could not generate SMILES (for regularization)" % st_num msg += "Try setting SKIP_BAD_LIGANDS to True" self.exit(msg) # Convert the SMILES to a Structure object: try: st = structure.SmilesStructure( pattern, st.property).get2dStructure() except Exception as err: if self['SKIP_BAD_LIGANDS']: self.debug(str(err)) self.warning( "WARNING: Structure %i: Could not convert SMILES to a CT. Skipping." % st_num) num_skipped_sts += 1 st_num += 1 continue # Skip the bad structure else: self.debug(str(err)) self.exit( "ERROR: Structure %i: Could not convert SMILES to a CT. Exiting." % st_num) if self['RETITLE']: if uniquefield == 'NONE': compound_id = str(st_num) else: try: # Ev:96648 & Ev: 104439 compound_id = pipeutils.read_unique_field( st, uniquefield) except KeyError: msg = "ERROR: File %s; ligand %i is missing property: %s" % ( os.path.basename(inligfile), st_num, uniquefield) if self['SKIP_NOUNIQUE_LIGANDS']: self.warning(msg) continue # Go to next input ligand else: self.exit(msg) if self['OUTCOMPOUNDFIELD'] == 's_m_title': if iformat not in [SMILES, SMILESCSV]: st.property["s_pipeline_title_backup"] = st.title # Will work for SmilesStructures also st.title = compound_id else: # FIXME will not work for SmilesStructures: st.property[self['OUTCOMPOUNDFIELD']] = compound_id if filenum not in recombined_writers_dict: outfile = self.genFileName(filenum=filenum, extension="-in.maegz") writer = structure.StructureWriter(outfile) writer.filename = outfile recombined_writers_dict[filenum] = writer recombined_writers_dict[filenum].append(st) ok_sts_in_file = True # Figure out what file the next structure should be written to: if mixligs: filenum += 1 if filenum > njobs: filenum = 1 else: sts_in_file += 1 if sts_in_file >= n_st_per_file: filenum += 1 sts_in_file = 0 if not ok_sts_in_file: self.warning( 'NOTE: Supported SMILES files should have one pattern per line (no header row), with optional space followed by the title/name.' ) self.exit( "\nERROR: Ligand file did not contain any readable structures: " + inligfile) print("") if num_skipped_sts: self.warning( "%i structures were skipped because they were invalid." % num_skipped_sts) num_read_ok = (st_num - num_skipped_sts) self.info("%i structures were successfully read." % num_read_ok) # Close file handles: recombined_files = [] for writer in recombined_writers_dict.values(): recombined_files.append(writer.filename) writer.close() self.recombined_files = recombined_files self.info('')
[docs] def setupJobs(self): """ Setup LigPrep subjobs by building up a command to submit, and passing that command to JobDJ via addJob(). """ # FIXME: MAKE IT SO THAT ALL VARIANTS OF BAD ROOT ARE REMOVED??? self.jobdj = self.getJobDJ() self.ligprep_jobnames = [] self.expected_outputs_by_jobname = {} taut_specfile = None if self['TAUT_SPEC_FILE']: taut_specfile = self['TAUT_SPEC_FILE'] self.info("Using custom tautomer spec file: %s" % taut_specfile) else: self.info("Using built-in tautomer spec file") if self['NORMALIZE']: self.info("Will normalize input variants (no expansion)") if self['RETAIN']: self.warning( "WARNING RETAIN keyword not supported for NORMALIZE job") if not self['NEUTRALIZE']: self.warning( "WARNING NEUTRALIZE False ignored for NORMALIZE job") if not self['GENERATE_TAUTOMERS']: self.warning( "WARNING GENERATE_TAUTOMERS False ignored for NORMALIZE job" ) else: self.info("Will generate new variants (expand)") for fileindex, ligfile in enumerate(self.recombined_files): jobname = self.genFileName(filenum=fileindex + 1) #ligprep = os.path.join(os.environ['SCHRODINGER'],'ligprep') # Fill automatically find $SCHRODINGER/ligprep: cmd = ['ligprep'] cmd.append('-kp') # Keep properties informat = fileutils.get_structure_file_format(ligfile) if informat is None and ligfile.endswith('.csv'): informat = SMILESCSV if informat == SMILES: cmd += ['-ismi', ligfile] elif informat == SMILESCSV: cmd += ['-ismi', ligfile] elif informat == MAESTRO: cmd += ['-imae', ligfile] elif informat == SD: cmd += ['-isd', ligfile] else: self.exit("ERROR Invalid file format: %s" % ligfile) outformat = self['OUTFORMAT'] # When NORMALIZING, output files are always merged anyway: # SD (Ev:71599) if self['NORMALIZE']: # Can't have informat and outformat both be SD: outformat = 'mae' if outformat == 'mae': outfile = jobname + '.maegz' cmd += ['-omae', outfile] elif outformat == 'sd': outfile = jobname + '.sdf' cmd += ['-osd', outfile] else: self.exit("ERROR: Unsupported output format: %s" % outformat) self.expected_outputs_by_jobname[jobname] = outfile if self['NORMALIZE']: # Normalize input variants (generates one per structure) if informat == 'sd': cmd += ['-R', 'c'] # New protocol: Use Epik and keep first stage (lowest penalty): cmd += ['-R', 'd', '-R', 'h', '-R', 'n', '-R', 'e'] # NOTE no need for -lab option since we will not be running # PostLigPrepStage cmd.append('-lab') # Label structures else: # Expand (generate variants) if self['RETAIN']: cmd += ['-retain'] cmd.append('-lab') # Label structures # LigPrep options: # -kp = keep properties # -lab = put labels on structures -> needed for limit stereo stage # -retain_it = keep input ionization and tautomeric states # -r = maximum number of ring conformations to generate per ligand ph = self['PH'] pht = self['PHT'] if self['USE_EPIK']: ms = self['MAX_STATES'] # Default for Epik is 16 cmd += [ '-epik', '-W', 'e,-ph,%f,-pht,%f,-ms,%i' % (ph, pht, ms) ] else: cmd += ['-W', 'i,-ph,%f,-pht,%f' % (ph, pht)] # NOTE: Anytime IONIZE is True, neutralizing is automatically # performed. if self['IONIZE']: cmd += ['-i', '2'] else: # Not ionizing # NEEDED? if self['USE_EPIK']: # NEEDED? self.warning("WARNING: IONIZE False ignored # because using Epik") if self['NEUTRALIZE']: cmd += ['-i', '1'] else: cmd += ['-i', '0'] if self['METAL_BINDING']: if not self['USE_EPIK']: self.exit( "ERROR: METAL_BINDING mode is supported only with Epik." ) cmd.append('-epik_metal_binding') if self['GENERATE_TAUTOMERS']: max_tauts = self['MAX_TAUTOMERS'] cmd += ['-t', str(max_tauts)] else: cmd.append('-nt') generate_stereos = self['NUM_STEREOISOMERS'] cmd += ['-s', str(generate_stereos)] max_stereos = self.get('MAX_STEREOISOMERS') if max_stereos is not None: if generate_stereos < max_stereos: self.exit( 'MAX_STEREOISOMERS can not be greater than NUM_STEREOISOMERS' ) cmd += ['-m', str(max_stereos)] # NOTE: When running an older VSW input file, it's possible # that results will change, as previously stereo limiting # was done by PostLigPrepStage and keyword was MAXSTEREO. if taut_specfile: cmd += ['-ts', taut_specfile] if self['STEREO_SOURCE'] == 'geometry': cmd.append('-g') if not self.getCleanupRequested(): cmd.append('-nc') self.debug("ADDING COMMAND: %s" % cmd) self.debug(" as string: %s" % " ".join(cmd)) self.jobdj.addJob(cmd) self.ligprep_jobnames.append(jobname)
[docs] def missingOutput(self, logfile): """ Returns True if logfile is present and shows that no structures where returned by LigPrep. """ with open(logfile) as fh: for line in fh: if "No structures found in" in line: return True return False
[docs] def processJobOutputs(self): """ After all LigPrep subjobs are complete, read each output file and combined them into one file, or a few files with up to 100K structures in each file. Structures are NOT combined if COMBINEOUTS is set to False. Instead each file is renamed in order to have a Pipeline-compliant name. """ self.output_files = [] if (self['COMBINEOUTS'] and len(self.ligprep_jobnames) > 1) or self['NORMALIZE']: # NOTE: If normalizing, we always need to keep first output state # per compound self.info("Combining output files...") combined_outfile = None sts_in_file = 0 st_num = 0 outformat = self['OUTFORMAT'] if outformat == 'sd': ext = '.sdf' else: # Assume Maestro ext = '.maegz' filenum = 0 for jobname in self.ligprep_jobnames: filenum += 1 outfile = self.expected_outputs_by_jobname[jobname] if outfile not in self.ligprep_output_files: # This subjob did not produce output (OK) self.warning( "WARNING LigPrep subjob did not produce output: %s" % jobname) continue # Try to read as Textual Structure only if will be writing to # Mae format: for st in pipeutils.get_reader(outfile, astext=(outformat == 'mae')): st_num += 1 if st_num % 1000 == 0: sys.stdout.write(".") sys.stdout.flush() if not combined_outfile: combined_outfile = self.genOutputFileName( 1, filenum=filenum, extension=ext) self.output_files.append(combined_outfile) sts_in_file = 0 if sts_in_file == 0: st.write(combined_outfile) else: st.append(combined_outfile) sts_in_file += 1 self.setOutput(1, pipeio.Structures(self.output_files, st_num)) else: # Do NOT combine; rename the files: filenum = 0 for jobname in self.ligprep_jobnames: filenum += 1 outfile = self.expected_outputs_by_jobname[jobname] if outfile not in self.ligprep_output_files: self.warning( "WARNING LigPrep subjob did not produce output: %s" % jobname) # This subjob did not produce output (OK) continue # Since just renaming the file, keep the same extension: ext = fileutils.splitext(outfile)[1] newname = self.genOutputFileName(1, filenum=filenum, extension=ext) shutil.move(outfile, newname) self.output_files.append(newname) self.setOutput(1, pipeio.Structures(self.output_files))
[docs] def checkProducts(self): """ Overrides the checkProducts() method of Stage class. If USE_EPIK is True, adds 'epik' as required prodct before the actual check is performed. Will get called after the instance of this stage is created. """ if self['USE_EPIK']: self.requiredProduct('epik')
[docs] def operate(self): """ Run the stage on the input structure file set. """ if self.status == "NOT STARTED" or \ self.status == "PREPARING INPUT FILES": self.status = "PREPARING INPUT FILES" self.recombineInputLigands() self.status = "SETTING UP JOBS" self.dump() if self.status == "SETTING UP JOBS": self.setupJobs() self.status = "RUNNING JOBS" self.dump() if self.status == "RUNNING JOBS": self.debug("DEBUG JobDJ Options: %s" % self.JobDJOptions()) # In case restarting, update JobDJ to new options: self.setJobDJOptions(self.jobdj) if self.px: # Make sure that the "-px" argument of the jobs managed by # jobdj is up to date (see LIGPREP-1732, PHASE-2241). for job in self.jobdj.all_jobs: if not job.isComplete(): replace_arg_value(job._command, '-px', self.px) # Ev:64286 if restarting, do not restart jobs that have output # files: if self.jobdj.hasStarted(): for i, job in enumerate(self.jobdj.all_jobs): if job.isComplete(): continue subjobname = self.ligprep_jobnames[i] outfile = self.expected_outputs_by_jobname[subjobname] logfile = subjobname + '.log' if os.path.isfile(outfile): print(' Output file exists for the job:', outfile) print(' Marking %s as completed' % subjobname) job._markFinished('found-output') elif os.path.isfile(logfile): if self.missingOutput(logfile): print(' Job finished; LigPrep produced no output') print(' Marking %s as completed' % subjobname) job._markFinished('found-output') self.info("Running LigPrep jobs...") self.runJobDJ(self.jobdj) self.info("Checking subjob outputs...") # All jobs completed # Make sure that all LigPrep subjobs procued output: self.ligprep_output_files = [] some_failed = False for i, job in enumerate(self.jobdj.all_jobs): subjobname = self.ligprep_jobnames[i] outfile = self.expected_outputs_by_jobname[subjobname] logfile = subjobname + '.log' if os.path.isfile(outfile): self.ligprep_output_files.append(outfile) else: if not os.path.isfile(logfile): self.error("ERROR: LigPrep log file missing: %s" % logfile) self.jobdj.markForRestart(job, "nolog") some_failed = True elif not self.missingOutput(logfile): # Output file is missing and log file shows that it # should exist: self.error("ERROR: LigPrep output file missing: %s" % outfile) self.jobdj.markForRestart(job, "noout") some_failed = True if some_failed: self.dump() self.exit("Some jobs failed. Try restarting. Exiting...") self.status = "PROCESSING FILES" self.dump() if self.status == "PROCESSING FILES": self.processJobOutputs() if self.getCleanupRequested(): self.status = "CLEANING UP" else: self.status = "COMPLETE" self.dump() if self.status == "CLEANING UP": # Clean up after myself by compressing/removing some files self.info("Cleaning up...") archive_files = [] # FIXME: Put this code into stage.py to make it universal: for jobname in self.ligprep_jobnames: # Remove these files: for ext in ['-in.maegz', '-in.mae', '-in.smi']: filename = jobname + ext if os.path.isfile(filename): fileutils.force_remove(filename) # Archive these files: for ext in [ '_bm.log', '_ds.log', '_ht.log', '_ion_detail.log', '_ion.log', '_lb.log', '_nu.log', '_pr.log', '_rc.log', '_st.log', '_ta.log' ]: filename = jobname + ext if os.path.isfile(filename): archive_files.append(filename) # Compress these files individually: for ext in ['_ionout-bad.mae']: filename = jobname + ext if os.path.isfile(filename): self.info('Compressing %s...' % filename) compress(filename) if archive_files: # There are files to archive: # Create an archive for log files: tar_file = self.genFileName(extension="_logs.tar.gz") log_tar = tarfile.open(tar_file, 'w:gz') for filename in archive_files: self.info('Archiving %s...' % filename) log_tar.add(filename) # Close the archive file: log_tar.close() # Remove the files that were added to the archive: for filename in archive_files: fileutils.force_remove(filename) self.status = "COMPLETE" self.dump() # COMPLETE return
[docs]class PostLigPrepStage(stage.Stage): """ This stage limits the number of steroisomers, removes states penalized by Epik/Ionizer, and generates the variant codes. PRESERVE_NJOBS whether to write out as many output files as there are inputs. Useful to create the same number of subjobs for QikProp as were used for LigPrep. By default, a single output file is created. """
[docs] def __init__(self, *args, **kwargs): specs = """ LIMIT_STEREOISOMERS = boolean(default=False) # LigPrepStage does stereo limiting now. MAXSTEREO = integer(default=4) # Ignored by default now REMOVE_PENALIZED_STATES = boolean(default=True) UNIQUEFIELD = string(default="s_m_title") OUTVARIANTFIELD = string(default="s_vsw_variant") OUTFORMAT = option("sd", "mae", default="mae") PRESERVE_NJOBS = boolean(default=False) """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.addExpectedInput(1, "structures", True) self.addExpectedOutput(1, "structures", True) self.maxstereo = None
[docs] def writeBestStereos(self, stereoisomers): """ Given a list of Structure objects from the same ion/taut state, decices which structures (stereoisomers) need to be kept and which ones need to be removed. This is done by keeping MAXSTEREO lowest- energy stereoisomers. The structures that are "kept" are then written to the output file. """ self.debug('writeBestStereos: %d' % len(stereoisomers)) # List of structures to keep: sts_to_keep = [] if not self.limit_stereos or (len(stereoisomers) <= self.maxstereo): # keep all stereoisomers: sts_to_keep = stereoisomers else: # More than MAXSTEREO stereoisomers: kept_num = 0 while kept_num < self.maxstereo: lowest_e = None best_variant = None for i, st in enumerate(stereoisomers): try: energy = st.property["r_lp_Energy"] except KeyError: raise RuntimeError( "ERROR: The ligand has no potential energy information." ) if lowest_e is None or energy < lowest_e: best_variant = i lowest_e = energy kept_num += 1 best_st = stereoisomers[best_variant] sts_to_keep.append(best_st) stereoisomers.pop(best_variant) # Write structures (stereoisomers) to be kept from this iontaut: for st in sts_to_keep: # add ligand variant property based on current count of its # compound self.current_variant += 1 st.property[self.outvariantfield] = "%s-%i" % ( self.current_compound_id, self.current_variant) self.current_output_writer.append(st) self.sts_in_file += 1 self.out_variants += 1
[docs] def nextOutFile(self): if self.current_output_writer: self.current_output_writer.close() if self.sts_in_file == 0 and not self.outfilenum == 0: raise RuntimeError("No structures were written to previous file!") if self.outfilenum != 0: self.debug("\nWriting to new file after writing %d structures" % self.sts_in_file) self.outfilenum += 1 self.sts_in_file = 0 self.current_output_file = self.genOutputFileName( 1, filenum=self.outfilenum, extension=".maegz") self.current_output_writer = structure.StructureWriter( self.current_output_file) self.output_files.append(self.current_output_file)
[docs] def operate(self): """ Optionally limits the number of stereoisomers in the input_files to <maxstereo> keeping lowest energy isomers. Optionally filters on ionization penalty """ self.input_ligands = self.getInput(1).getFiles() self.maxstereo = self['MAXSTEREO'] if self['LIMIT_STEREOISOMERS']: self.limit_stereos = True if self.maxstereo > 32 or self.maxstereo < 1: self.exit("limit_stereo: MAXSTEREO must be between 1 and 32 !") self.warning( 'WARNING: LIMIT_STEREOISOMERS option is deprecated. Use MAX_STEREOISOMERS option in LigPrepStage instead.' ) else: # This is the default now, as LigPrepStage now handles stereo # expansion via the new -m option. self.limit_stereos = False if not self['OUTFORMAT'] in ["mae", "sdf"]: self.exit("Output Format needs to be .mae or .sdf!") self.uniquefield = self['UNIQUEFIELD'] if self.uniquefield.upper() == "NONE": # VSW-807 So that badly-formatted VSW input files work self.warning( "WARNING: 'NONE' is not a valid value for the UNIQUEFIELD keyword. Using 's_m_title' instead." ) self.uniquefield = "s_m_title" self.outvariantfield = self['OUTVARIANTFIELD'] self.preserve_njobs = self['PRESERVE_NJOBS'] found_ligands_wo_penalty = False if self['REMOVE_PENALIZED_STATES']: remove_states = True else: remove_states = False if self.limit_stereos and remove_states: self.info( "Removing penalized states and keeping %i stereoisomers per ion/taut of each compound." % self.maxstereo) elif self.limit_stereos: self.info( " Keeping %i stereoisomers per ion/taut of each compound." % self.maxstereo) elif remove_states: self.info("Removing penalized states.") else: self.info( "Generating variant codes and recombining structure files.") self.output_files = [] self.outfilenum = 0 self.current_output_writer = None self.sts_in_file = 0 self.create_new_out_file_on_next_write = True self.in_variants = 0 self.in_roots = 0 self.out_variants = 0 self.current_compound_id = None self.current_variant = 0 current_iontaut_state = None stereoisomers = [] # list of Structure objects for input_file in self.input_ligands: self.info("\nWorking on file: %s" % input_file) if self.preserve_njobs: self.create_new_out_file_on_next_write = True # Go through all structures in a file: st_num = 0 for st in structure.StructureReader(input_file): st_num += 1 self.in_variants += 1 # Print progress period: if st_num % 1000 == 0: sys.stdout.write(".") sys.stdout.flush() self.debug('Input st: %d' % st_num) # Filter the structure on state penalty: if remove_states: skip_state = False if "r_epik_State_Penalty" in st.property: # Ev:93901 Remove the state only if both penalties are # greater than 2.5 (if both exist): if st.property["r_epik_State_Penalty"] > 2.5: if "r_epik_Metal_State_Penalty" in st.property: if st.property[ "r_epik_Metal_State_Penalty"] > 2.5: skip_state = True else: skip_state = True else: try: # No r_epik_State_Penalty property: if st.property["r_ionizer_Ionization_penalty"] > 2.8: skip_state = True if st.property[ "r_ionizer_Ionization_penalty_charging"] > 1.4: skip_state = True if st.property[ "r_ionizer_Ionization_penalty_neutral"] > 2.1: skip_state = True except KeyError: found_ligands_wo_penalty = True if skip_state: self.debug(' removed high penalty state') continue # Go to the next state # Get the compound code of this structure (by unique field): try: # Ev:96648 & Ev: 104439 compound_id = pipeutils.read_unique_field( st, self.uniquefield) except KeyError: self.exit("No field %s ligand %s!" % (self.uniquefield, st_num)) # Get the iontaut state of the structure: try: label = st.property["s_lp_label"] except KeyError: self.exit( "ERROR: LigPrep was not run, or was run without the -lab option." ) # Determine the ion/taut state number of this ligand by label: iontaut_state = label.split('_stereoizer')[0] self.debug(' compound_id: %s' % compound_id) self.debug(' iontaut state: %s' % iontaut_state) if (compound_id == self.current_compound_id) and ( iontaut_state == current_iontaut_state): self.debug(' same iontaut state as previous') # Same iontaut state as previous st (same compound): stereoisomers.append(st) # Reached new state (may be same compound or different # compound): else: self.debug(' new state') if stereoisomers: # Will not happen only for first st self.debug(" writing old state's stereoisomers") # Write out stereoisomers of last ion/taut state: if self.create_new_out_file_on_next_write: self.debug("creating new output file") self.nextOutFile() self.create_new_out_file_on_next_write = False self.writeBestStereos(stereoisomers) # Reset the state list: current_iontaut_state = iontaut_state stereoisomers = [st] # Figure out if different compound than previous structure: if compound_id != self.current_compound_id: self.debug(' new compound') self.in_roots += 1 self.current_compound_id = compound_id self.current_variant = 0 # DONE: All structures were read if stereoisomers: if not self.current_output_writer: # This will happen if the input set has only one iontaut state self.nextOutFile() # Write out stereoisomers of the very last ion/taut state: self.writeBestStereos(stereoisomers) # Close the last writer: self.current_output_writer.close() if found_ligands_wo_penalty: self.warning( "Some ligands did not have state penalty information causing all states to be retained" ) msg = "\nNumber of compounds: %i" % self.in_roots msg += "\nNumber of input structures: %i" % self.in_variants msg += "\nNumber of output structures: %i" % self.out_variants self.info(msg) self.setOutput(1, pipeio.Structures(self.output_files, self.out_variants))
# EOF