Source code for schrodinger.pipeline.stages.filtering

"""
Stages related to filtering structures.

Main stage uses $SCHRODINGER/utilities/ligfilter.

Copyright Schrodinger, LLC. All rights reserved.

"""

# Contributors: Matvey Adzhigirey

import os
import sys

import schrodinger.utils.fileutils as fileutils
import schrodinger.utils.subprocess as subprocess
from schrodinger import structure
from schrodinger.pipeline import pipeio
from schrodinger.pipeline import pipeutils
from schrodinger.pipeline import stage

# 150 to 500 is drug-like; 100-150 and 500-600 are coarse:

# propname: (co min, dl min, dl max, co max)
#          LO    CO        DL        CO    LO

druglike_criteria = {
    'r_qp_mol_MW': (None, None, 500, 650),
    'r_qp_QPlogPo/w': (None, None, 5.0, 6.0),
    'r_qp_donorHB': (None, None, 5, None),
    'r_qp_accptHB': (None, None, 10, None),
    'r_qp_PSA': (None, None, 150, 175),
    'r_qp_QPlogS': (-7, -5, None, None),
}
DL = 3
CO = 2
LO = 1


[docs]class LigFilterStage(stage.Stage): """ Stage interface for the LigFilter utility. The keywords specific to this stage are... FILTER_FILE File name for a Ligfilter criteria file. CONDITIONS String list of Ligfilter criteria. Ignored if a FILTER_FILE is specified. The stage takes one set of input structure files and generates one set of corresponding output files. """
[docs] def __init__(self, *args, **kwargs): """ See class docstring. """ specs = """ FILTER_FILE = string(default=None) CONDITIONS = string_list(default=None) """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.jobdj = None self.status = "NOT STARTED" self.addExpectedInput(1, "structures", True) self.addExpectedOutput(1, "structures", True)
[docs] def setupJobs(self): """ Sets up LigFilter jobs, which are distributed via JobDJ. There will be one Ligfilter job for each input file. This method clears the working directory of previous output and log files (if any). Raises a RuntimeError if there is no FILTER_FILE and no CONDITIONS. """ filter_file = self['FILTER_FILE'] conds = self['CONDITIONS'] if not filter_file and not conds: raise RuntimeError("No filtering conditions!") if filter_file: self.info("Using filter file: %s" % filter_file) else: # CONDITIONS specified for cond in conds: cond = cond.strip("'").strip("\"") self.info('Condition: "%s"' % cond) self.jobdj = self.getJobDJ() self.ligfilter_jobnames = [] self.input_files = self.getInput(1).getFiles() self.expected_output_files = {} # key: subjobname for filenum, infile in enumerate(self.input_files): subjobname = self.genFileName(filenum=filenum + 1) # Work-around a limitation in LigFilter where if input is # SMILES-formatted, the output must also be: if fileutils.get_structure_file_format(infile) == "smiles": ext = ".smi" else: ext = ".maegz" outfile = self.genOutputFileName(1, filenum=filenum + 1, extension=ext) self.expected_output_files[subjobname] = outfile logfile = subjobname + '.log' # Remove the output/log files from previous run (if any): if os.path.isfile(outfile): os.remove(outfile) if os.path.isfile(logfile): os.remove(logfile) if filter_file: cmd = [ 'ligfilter', '-f', filter_file, infile, '-o', outfile, '-j', subjobname ] else: # CONDITIONS specified cmd = ['ligfilter', infile, '-o', outfile, '-j', subjobname] for cond in conds: cond = cond.strip("'").strip("\"") cmd += ['-e', cond] self.debug("CMD: %s" % subprocess.list2cmdline(cmd)) self.jobdj.addJob(cmd, jobname=subjobname) self.ligfilter_jobnames.append(subjobname)
[docs] def processJobOutputs(self): """ Reports the number of structures that passed the stage. Raises a RuntimeError if any Ligfilter .log file is missing the summary information. """ filenum = 0 processed_ligs = [] for jobname in self.ligfilter_jobnames: outfile = self.expected_output_files[jobname] num_passed = self.num_passed[jobname] if num_passed > 0: filenum += 1 self.checkFile(outfile) processed_ligs.append(outfile) num_passed_total = sum(self.num_passed.values()) num_input_total = sum(self.num_input.values()) msg = "\nLigFilter Summary: %i of %i structures passed the filter" % ( num_passed_total, num_input_total) self.info(msg) if not processed_ligs: self.error("\nNone of the structures have passed LigFilter!") sys.exit(1) self.setOutput(1, pipeio.Structures(processed_ligs, num_passed_total))
[docs] def operate(self): """ Perform an operation on the input files. There are setup, running, and post-processing steps, and the stage records its current status so that it can be restarted in that step if there is a failure. Raises a RuntimeError if the JobDJ run() method fails, or if the stage finishes with an improper status. """ if self.status == "NOT STARTED" or self.status == "SETTING UP JOBS": self.status = "SETTING UP JOBS" self.setupJobs() self._job_outputs = [] # If restarting, go directly to next step. self.status = "RUNNING JOBS" self.dump() if self.status == "RUNNING JOBS": # Update JobDJ to correct options (may change if restarting): self.setJobDJOptions(self.jobdj) # Ev:64286 if restarting, do not restart jobs that have output # files: if self.jobdj.hasStarted(): self.info("Checking for completed subjobs...") for i, job in enumerate(self.jobdj.all_jobs): subjobname = self.ligfilter_jobnames[i] logfile = subjobname + '.log' outfile = self.expected_output_files[subjobname] if not os.path.isfile(logfile): if job.isComplete(): self.jobdj.markForRestart(job, 'missinglog') print('FAILED:', subjobname) continue read_summary = False num_passed = None with open(logfile) as fh: for line in fh: if "Summary: " in line: s = line.split() num_passed = int(s[1]) read_summary = True break if not read_summary: if job.isComplete(): self.jobdj.markForRestart(job, 'incompletelog') some_failed = True print('FAILED:', subjobname) continue if num_passed > 0 and not os.path.isfile(outfile): if job.isComplete(): job.jobdj.markForRestart(job, "nooutput") print('FAILED:', subjobname) continue # If got here then subjob is complete. if not job.isComplete(): print(' Output exists for the job:', outfile) print(' Marking %s as completed' % subjobname) job._markFinished('found-output') continue self.dump() self.info("Running LigFilter jobs...") self.runJobDJ(self.jobdj) self.info("Checking subjob outputs...") self.num_input = {} self.num_passed = {} some_failed = False #self.ligfilter_output_files = [] for i, job in enumerate(self.jobdj.all_jobs): subjobname = self.ligfilter_jobnames[i] logfile = subjobname + '.log' outfile = self.expected_output_files[subjobname] if not os.path.isfile(logfile): job.jobdj.markForRestart(job, "nolog") some_failed = True print('FAILED:', subjobname) continue # The output file may be missing if no structures passed the # filter # Parse the LigFilter log file: read_summary = False num_passed = 0 # Stays zero if no Summary line in file num_input = 0 with open(logfile) as fh: for line in fh: if "Summary: " in line: s = line.split() num_passed = int(s[1]) num_input = int(s[3]) read_summary = True break if not read_summary: self.jobdj.markForRestart(job, 'incompletelog') some_failed = True print('FAILED:', subjobname) continue if num_passed > 0 and not os.path.isfile(outfile): self.jobdj.markForRestart(job, "nooutput") some_failed = True print('FAILED:', subjobname) continue self.num_passed[subjobname] = num_passed self.num_input[subjobname] = num_input if some_failed: self.dump() self.exit("Some jobs failed. Try restarting. Exiting...") self.status = "PROCESSING FILES" self.dump() if self.status == "PROCESSING FILES": self.processJobOutputs() self.status = "COMPLETE" self.dump()
[docs]class SubSetStage(stage.Stage): """ Stage for making a subset of the input files based on a list of titles or other property. The keywords specific to this stage are: PROPERTY Property to filter on (default s_m_title) VALUE_FILE File containing a list of values to keep (one per line) if a FILTER_FILE is specified. The stage takes one set of input structure files and generates one set of corresponding output files. NOTE: SMILES format files can only be filtered on the title. """
[docs] def __init__(self, *args, **kwargs): """ See class docstring. """ specs = """ PROPERTY = string() VALUE_FILE = string() """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.addExpectedInput(1, "structures", True) self.addExpectedOutput(1, "structures", True) self.jobdj = None self.status = "NOT STARTED"
[docs] def filterSmilesFile(self, ligfile, outfile): """ Filter the file <ligfile> by title based on VALUE_FILE and add the output file to self.output_files list """ if self.property != 's_m_title': err = "SMILES files can only be subseted on the title" raise RuntimeError(err) outnum = 0 with open(ligfile) as infh, open(outfile, 'w') as outfh: for line in infh: self.num_input_sts += 1 # Print progress period: if self.num_input_sts % 1000 == 0: sys.stdout.write(".") sys.stdout.flush() s = line.split() if len(s) > 1: # Title present title = s[1] else: continue if title in self.allowed_values: outnum += 1 outfh.write(line) self.num_output_sts += outnum if outnum > 0: self.output_files.append(outfile)
[docs] def filterFile(self, ligfile, outfile): """ Filter the file <ligfile> based on PROPERTY & VALUE_FILE and add the output file to self.output_files list """ st_num = 0 outnum = 0 writer = structure.StructureWriter(outfile) for st in pipeutils.get_reader(ligfile, astext=True): st_num += 1 self.num_input_sts += 1 # Print progress period: if self.num_input_sts % 1000 == 0: sys.stdout.write(".") sys.stdout.flush() try: value = st.property[self.property] except KeyError: err = "Error reading field %s for st %i (file %s)" % ( self.property, st_num, ligfile) raise RuntimeError(err) if value in self.allowed_values: outnum += 1 writer.append(st) writer.close() self.num_output_sts += outnum if outnum > 0: self.output_files.append(outfile)
[docs] def readValueList(self): self.property = self['PROPERTY'] value_file = self['VALUE_FILE'] self.allowed_values = [] if self.property.startswith('i_'): conversion_func = int elif self.property.startswith('r_'): conversion_func = float elif self.property.startswith('s_'): conversion_func = str else: msg = 'Invalid property for SubSetStage: "%s"' % self.property raise ValueError(msg) # Generate a list of allowed values: with open(value_file) as fh: for line in fh: line = line.strip() if not line: continue try: value = conversion_func(line) except ValueError: msg = "Invalid value (%s) for property %s" % (line, self.property) raise ValueError(msg) self.allowed_values.append(value)
[docs] def operate(self): """ Perform the filtering operation on the input files. """ if self.status == "NOT STARTED": self.readValueList() # Generate list of allowed values self.input_files = self.getInput(1).getFiles() self.curr_file = 1 self.output_files = [] self.num_input_sts = 0 self.num_output_sts = 0 # If restarting, go directly to next step. self.status = "RUNNING JOBS" self.dump() if self.status == "RUNNING JOBS": # Continue with the file that we stopped with (or start with file # 1) while self.curr_file <= len(self.input_files): infile = self.input_files[self.curr_file - 1] ext = fileutils.splitext(os.path.basename(infile))[1] if ext == '.sdf.gz': # Ev:93758 ext = '.sdf' outfile = self.genOutputFileName(1, filenum=self.curr_file, extension=ext) if ext in ['.smi', '.smiles']: self.filterSmilesFile(infile, outfile) else: self.filterFile(infile, outfile) # ^--- will add the output file to self.output_files self.curr_file += 1 self.dump() # so that if restarting we pick up from next file msg = "\nSubSet Summary: %i of %i structures were kept" % ( self.num_output_sts, self.num_input_sts) self.info(msg) if self.num_output_sts == 0: self.error("\nSUBSET ERROR: None of the structures were kept!") sys.exit(1) self.setOutput( 1, pipeio.Structures(self.output_files, self.num_output_sts)) self.status = "COMPLETE" self.dump()
[docs]class DrugLikeSplitStage(stage.Stage): """ Stage for splitting the input ligand structures into 3 groups: Drug-like ligands: -dl.maegz Course ligands: -co.maegz Left-over ligands: -lo.maegz Based on the specified criteria. If at least one variant of a root matches all criteria for a drug-like ligand, then all variants of that root are included in the dl.maegz output file. If none match, then if at least one variant matches all criteria for coarse ligand, then all variants are included in the co.maegz file. Otherwise all variants are included in the lo.maegz file. All variants for the same root must be listed in blocks (next to each other) in the input file and have the same title. Also labels variants by setting s_vsw_variant field to <title>-# Where # is a variant number (1 to n). """
[docs] def __init__(self, *args, **kwargs): specs = """ UNIQUEFIELD = string(default="s_m_title") """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.status = "NOT STARTED" self.addExpectedInput(1, "structures", True) self.addExpectedOutput(1, "structures", False) self.addExpectedOutput(2, "structures", False) self.addExpectedOutput(3, "structures", False)
[docs] def operate(self): """ Perform an operation on the input files. """ ligands = self.getInput(1).getFiles() # Ev:98054 Print the filter criteria: self.info("Structures will be filtered on the following criteria:") self.info(" Drug-like criteria:") for prop, c in druglike_criteria.items(): dl_min = c[1] dl_max = c[2] if dl_min and dl_max: self.info(" %s: between %s and %s" % (prop, dl_min, dl_max)) elif dl_min: self.info(" %s: more than %s" % (prop, dl_min)) elif dl_max: self.info(" %s: less than %s" % (prop, dl_max)) self.info( " Coarse criteria (for structures which do not match the above criteria):" ) for prop, c in druglike_criteria.items(): co_min = c[0] co_max = c[3] if co_min and co_max: self.info(" %s: between %s and %s" % (prop, co_min, co_max)) elif co_min: self.info(" %s: more than %s" % (prop, co_min)) elif co_max: self.info(" %s: less than %s" % (prop, co_max)) self.info(" All other structures are 'left-overs'") self.info("") self.dl_writer = structure.MultiFileStructureWriter( self.getOutputName(1), '.maegz') self.co_writer = structure.MultiFileStructureWriter( self.getOutputName(2), '.maegz') self.lo_writer = structure.MultiFileStructureWriter( self.getOutputName(3), '.maegz') self.num_dl_roots = 0 self.num_co_roots = 0 self.num_lo_roots = 0 self.num_input_roots = 0 self.num_input_sts = 0 uniquefield = self['UNIQUEFIELD'] self.info("Reading and splitting input files...") # It is possible that some structures had properties missing: self.missing_props = set() current_root = -1 current_root_qualification = LO root_sts = [] for ligfile in ligands: self.info("\nReading file: %s" % ligfile) for st in structure.StructureReader(ligfile): self.num_input_sts += 1 if self.num_input_sts % 1000 == 0: sys.stdout.write(".") sys.stdout.flush() if st.property[uniquefield] != current_root: self.num_input_roots += 1 # New root: if current_root != -1: self.writeRoot(root_sts, current_root, current_root_qualification) current_root = st.property[uniquefield] root_sts = [] current_root_qualification = LO variant_qualification = self.qualify(st) # print "variant qualification:", variant_qualification if variant_qualification > current_root_qualification: current_root_qualification = variant_qualification root_sts.append(st) # write sts for last root to file: if current_root != -1: self.writeRoot(root_sts, current_root, current_root_qualification) self.dl_writer.close() self.co_writer.close() self.lo_writer.close() dl_output_files = self.dl_writer.getFiles() co_output_files = self.co_writer.getFiles() lo_output_files = self.lo_writer.getFiles() self.info('') for prop in self.missing_props: self.warning('ERROR some structures were missing property: %s' % prop) self.info("Number of structures processed: %i" % self.num_input_sts) self.info(" compounds: %i" % self.num_input_roots) dl_count = self.dl_writer.getNumStructures() co_count = self.co_writer.getNumStructures() lo_count = self.lo_writer.getNumStructures() self.info("") self.info('Number of drug-like structures: %i' % dl_count) self.info(' compounds: %i' % self.num_dl_roots) self.info('Number of coarse structures: %i' % co_count) self.info(' compounds: %i' % self.num_co_roots) self.info('Number of left-over structures: %i' % lo_count) self.info(' compounds: %i' % self.num_lo_roots) self.info('') self.info('Total number of output structures: %i' % (dl_count + co_count + lo_count)) self.info(' compounds: %i' % (self.num_dl_roots + self.num_co_roots + self.num_lo_roots)) # Should never happen: if not dl_output_files and not co_output_files and not lo_output_files: self.error("NO OUTPUT FILES PRODUCED") self.exit(1) if dl_output_files: self.setOutput(1, pipeio.Structures(dl_output_files, dl_count)) if co_output_files: self.setOutput(2, pipeio.Structures(co_output_files, co_count)) if lo_output_files: self.setOutput(3, pipeio.Structures(lo_output_files, lo_count))
[docs] def qualify(self, st): """ Returns module constant classification for the structure: Drug-like, coarse, etc. """ qualifies_for_dl = True for prop, c in druglike_criteria.items(): co_min = c[0] dl_min = c[1] dl_max = c[2] co_max = c[3] try: value = st.property[prop] except: self.missing_props.add(prop) # No QikProp properties return LO if co_max is not None and value > co_max: return LO if co_min is not None and value < co_min: return LO # if did not qualify for CO would not even go here if qualifies_for_dl: # If failed DL filter before, don't check again if dl_max is not None and value > dl_max: qualifies_for_dl = False if dl_min is not None and value < dl_min: qualifies_for_dl = False if qualifies_for_dl: return DL else: return CO
[docs] def writeRoot(self, root_sts, current_root, current_root_qualification): for variant, st in enumerate(root_sts): st.property['s_vsw_variant'] = "%s-%i" % (current_root, variant + 1) if current_root_qualification == DL: self.dl_writer.append(st) elif current_root_qualification == CO: self.co_writer.append(st) else: self.lo_writer.append(st) if current_root_qualification == DL: self.num_dl_roots += 1 elif current_root_qualification == CO: self.num_co_roots += 1 else: self.num_lo_roots += 1
[docs]class MergeDuplicatesStage(stage.Stage): """ Stage for removing duplicate variants within each root (compound) based on unique SMILES strings. NOTE: All variants of each root must be in consecutive order in the input. """
[docs] def __init__(self, *args, **kwargs): specs = """ NEUTRALIZE = boolean(default=False) # Whether to neutralize the SMILES string before merging. DESALT = boolean(default=False) # Whether to desalt the SMILES string before merging. MERGE_PROPS = boolean(default=True) # Whether merge properties or to keep only the properties from the first-occuring structure. If merging, duplicate values are concatenated using commas. OUTFORMAT = option("sdf", "canvascsv", "smiles", default="sdf") # Format to export the output to. SMILES_FIELD = string(default="VendorSMILES") # Property name where to store the SMILES string CODE_FIELD = string(default=None) # Property name where to store the unique compound code. CODE_PREFIX = string(default=None) # String to prepend to the unique compound code. REQUIRE_PROPS = boolean(default=False) # Whether to skip all structures that have no properties """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.addExpectedInput(1, "structures", True) self.addExpectedOutput(1, "structures", True)
[docs] def operate(self): """ Perform an operation on the input files. """ in1 = self.getInput(1) input_files = in1.getFiles() # FIXME if restarting call with -restart option. jobname = self.genFileName() exe = os.path.join(os.environ["SCHRODINGER"], "utilities", "merge_duplicates") cmd = [exe, '-j', jobname] if self['NEUTRALIZE']: cmd.append('-neutralize') if self['DESALT']: cmd.append('-desalt') if not self['MERGE_PROPS']: cmd.append('-nomerge') if self['OUTFORMAT'] == 'canvascsv': cmd.append('-csv') elif self['OUTFORMAT'] == 'cmiles': cmd.append('-smiles') elif self['OUTFORMAT'] == 'sdf': pass else: raise ValueError("Invalid value for OUTFORMAT option: '%s'" % self['OUTFORMAT']) if self['SMILES_FIELD']: cmd += ['-smilesfield', self['SMILES_FIELD']] # Will catch both None and "None" if str(self['CODE_FIELD']) != "None": cmd += ['-codefield', self['CODE_FIELD']] # Will catch both None and "None" if self['CODE_PREFIX'] and str(self['CODE_PREFIX']) != "None": cmd += ['-codeprefix', self['CODE_PREFIX']] if self['REQUIRE_PROPS']: cmd.append('-requireprops') # FIXME add for the latest verion of the script: # cmd += ['-max_per_file', '1000000'] cmd += input_files print('RUNNING:', subprocess.list2cmdline(cmd)) logfile = jobname + '-merge.log' with open(logfile, 'w') as fh: out = subprocess.call(cmd, stdout=fh, stderr=fh) print('') if out != 0: print('merge_duplicates process has terminated') raise RuntimeError( "ERROR: $SCHRODINGER/utilities/merge_duplicates exited with error code: %s" % out) print('merge_duplicates process has completed successfully') output_files = [] num_kept_structures = None with open(logfile) as fh: for line in fh: if line.startswith("Output file: "): outputfile = line.rstrip('\r\n')[13:] output_files.append(outputfile) elif line.startswith("Number of unique structures exported:"): num_kept_structures = int(line.split(':')[1]) if not output_files: raise RuntimeError( "ERROR: $SCHRODINGER/utilities/merge_duplicates did not return any files" ) print('Output files:', output_files) print('Number of output structures:', num_kept_structures) self.setOutput(1, pipeio.Structures(output_files, num_kept_structures))
[docs]class GeneratePropsStage(stage.Stage): """ Ev:85070 For each structure in the input set, runs: $SCHRODINGER/run generate_ligfilter_properties.py, which generates properties specified via the ligfilter.predefined_function_dict. """
[docs] def __init__(self, *args, **kwargs): """ See class docstring. """ specs = """ """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.jobdj = None self.status = "NOT STARTED" self.addExpectedInput(1, "structures", True) self.addExpectedOutput(1, "structures", True)
[docs] def setupJobs(self): """ Sets up subjobs, which are distributed via JobDJ. There will be one subjob per each input file. """ self.jobdj = self.getJobDJ() self.subjobnames = [] self.input_files = self.getInput(1).getFiles() self.expected_output_files = {} # key: subjobname for filenum, infile in enumerate(self.input_files): subjobname = self.genFileName(filenum=filenum + 1) outfile = self.genOutputFileName(1, filenum=filenum + 1, extension=".maegz") self.expected_output_files[subjobname] = outfile logfile = subjobname + '.log' # Remove the output/log files from previous run (if any): if os.path.isfile(outfile): os.remove(outfile) if os.path.isfile(logfile): os.remove(logfile) cmd = [ 'run', 'generate_ligfilter_properties_startup.py', infile, outfile, '-j', subjobname ] self.debug("CMD: %s" % subprocess.list2cmdline(cmd)) self.jobdj.addJob(cmd, jobname=subjobname) self.subjobnames.append(subjobname)
[docs] def processJobOutputs(self): """ Analyzes the output files from the subjobs. Raises a RuntimeError if there are any errors with the subjobs. """ filenum = 0 output_files = [] num_processed = 0 for jobname in self.subjobnames: outfile = self.expected_output_files[jobname] num_processed += self.num_processed[jobname] self.checkFile(outfile) output_files.append(outfile) msg = "\nAll subjobs have completed. LigFilter properties have been generated." msg += "\nNumber of processed structures: %i" % num_processed msg += "\nOutput files: %s" % output_files self.info(msg) if not output_files: self.error("\nNo output files!") sys.exit(1) # FIXME report the number of output structures: self.setOutput(1, pipeio.Structures(output_files, num_processed))
[docs] def operate(self): """ Perform an operation on the input files. There are setup, running, and post-processing steps, and the stage records its current status so that it can be restarted in that step if there is a failure. Raises a RuntimeError if the JobDJ run() method fails, or if the stage finishes with an improper status. """ if self.status == "NOT STARTED" or self.status == "SETTING UP JOBS": self.status = "SETTING UP JOBS" self.setupJobs() self._job_outputs = [] # If restarting, go directly to next step. self.status = "RUNNING JOBS" self.dump() if self.status == "RUNNING JOBS": # Update JobDJ to correct options (may change if restarting): self.setJobDJOptions(self.jobdj) # Ev:64286 if restarting, do not restart jobs that have output # files: if self.jobdj.hasStarted(): self.info("Checking for completed subjobs...") for i, job in enumerate(self.jobdj.all_jobs): subjobname = self.subjobnames[i] logfile = subjobname + '.log' outfile = self.expected_output_files[subjobname] if not os.path.isfile(logfile): if job.isComplete(): self.jobdj.markForRestart(job, 'missinglog') print('FAILED:', subjobname) continue job_complete = False num_processed = None with open(logfile) as fh: for line in fh: if line.startswith( "Number of processed structures:"): s = line.split(":") num_processed = int(s[1]) job_complete = True break if not job_complete: if job.isComplete(): self.jobdj.markForRestart(job, 'incompletelog') some_failed = True print('FAILED:', subjobname) continue if not os.path.isfile(outfile): if job.isComplete(): self.jobdj.markForRestart(job, "nooutput") print('FAILED:', subjobname) continue # If got here then subjob is complete. if not job.isComplete(): print(' Output exists for the job:', outfile) print(' Marking %s as completed' % subjobname) job._markFinished('found-output') continue self.dump() self.info("Running Generate LigFilter Properties subjobs...") self.runJobDJ(self.jobdj) self.info("Checking subjob outputs...") self.num_processed = {} some_failed = False for i, job in enumerate(self.jobdj.all_jobs): subjobname = self.subjobnames[i] logfile = subjobname + '.log' outfile = self.expected_output_files[subjobname] if not os.path.isfile(logfile): self.jobdj.markForRestart(job, "nolog") some_failed = True print('FAILED:', subjobname) continue # Parse the subjob log file: job_complete = False num_processed = 0 # Stays zero if no Summary line in file with open(logfile) as fh: for line in fh: if line.startswith("Number of processed structures:"): s = line.split(":") num_processed = int(s[1]) job_complete = True break if not job_complete: self.jobdj.markForRestart(job, 'incompletelog') some_failed = True print('FAILED:', subjobname) continue if not os.path.isfile(outfile): self.jobdj.markForRestart(job, "nooutput") some_failed = True print('FAILED:', subjobname) continue self.num_processed[subjobname] = num_processed if some_failed: self.dump() self.exit("Some jobs failed. Try restarting. Exiting...") self.status = "PROCESSING FILES" self.dump() if self.status == "PROCESSING FILES": self.processJobOutputs() self.status = "COMPLETE" self.dump()
[docs]class ChargeFilterStage(stage.Stage): """ Stage-based class for filtering a set of structure files by total charge. MIN_CHARGE and MAX_CHARGE are the two keywords specific to this stage. If a structure has a total charge within [MIN_CHARGE,MAX_CHARGE] (inclusive), it is retained; otherwise, the structure is filtered out. The stage takes one input structure file set and generates one set of corresponding output structure files. """
[docs] def __init__(self, *args, **kwargs): """ See class docstring. """ stage.Stage.__init__(self, *args, **kwargs) specs = """ MIN_CHARGE = integer(default=None) MAX_CHARGE = integer(default=None) """ stage.Stage.__init__(self, specs=specs, *args, **kwargs) self.addExpectedInput(1, "structures", True) self.addExpectedOutput(1, "structures", True)
[docs] def operate(self): """ Read all the structures in the input files. If a structure's total charge is between MIN_CHARGE and MAX_CHARGE, write it to a the corresponding output file. Raises an IOError if there is a problem reading an input file or writing an output file, and raises a SystemExit if there are no output structures. """ input_files = self.getInput(1).getFiles() min_charge = self['MIN_CHARGE'] max_charge = self['MAX_CHARGE'] msg = 'Retaining ligands whose charge falls within [%s, %s]' % \ (min_charge, max_charge) self.debug(msg) output_files = [] # FIXME: Generate ouly ONE output file (or up to 50K files) num_passed = 0 filenum = 0 for infile in input_files: self.info("Processing file %s..." % infile) sts_in_file = 0 filenum += 1 outfile = self.genOutputFileName(1, filenum=filenum, extension='.maegz') writer = structure.StructureWriter(outfile) # FIXME: Add processing period printout st_num = 0 for st in structure.StructureReader(infile): st_num += 1 # Calculate the formal charge for the entire structure: charge = st.formal_charge msg = "File %i ligand %i charge: %s" % (filenum, st_num, charge) self.debug(msg) if min_charge is not None and charge < min_charge: self.debug(' skipped') continue # skip the ligand, and go to the next if max_charge is not None and charge > max_charge: self.debug(' skipped') continue # skip the ligand, and go to the next num_passed += 1 self.debug(' retained') writer.append(st) sts_in_file += 1 if sts_in_file: # If added anything to this file output_files.append(outfile) # Close last file: writer.close() if not output_files: self.warning("None of the ligands have passed the Charge Filter!") raise SystemExit self.setOutput(1, pipeio.Structures(output_files, num_passed))
# EOF