"""
Stages related to filtering structures.
Main stage uses $SCHRODINGER/utilities/ligfilter.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributors: Matvey Adzhigirey
import os
import sys
import schrodinger.utils.fileutils as fileutils
import schrodinger.utils.subprocess as subprocess
from schrodinger import structure
from schrodinger.pipeline import pipeio
from schrodinger.pipeline import pipeutils
from schrodinger.pipeline import stage
# 150 to 500 is drug-like; 100-150 and 500-600 are coarse:
# propname: (co min, dl min, dl max, co max)
# LO CO DL CO LO
druglike_criteria = {
'r_qp_mol_MW': (None, None, 500, 650),
'r_qp_QPlogPo/w': (None, None, 5.0, 6.0),
'r_qp_donorHB': (None, None, 5, None),
'r_qp_accptHB': (None, None, 10, None),
'r_qp_PSA': (None, None, 150, 175),
'r_qp_QPlogS': (-7, -5, None, None),
}
DL = 3
CO = 2
LO = 1
[docs]class LigFilterStage(stage.Stage):
"""
Stage interface for the LigFilter utility.
The keywords specific to this stage are...
FILTER_FILE File name for a Ligfilter criteria file.
CONDITIONS String list of Ligfilter criteria. Ignored
if a FILTER_FILE is specified.
The stage takes one set of input structure files and generates one set
of corresponding output files.
"""
[docs] def __init__(self, *args, **kwargs):
"""
See class docstring.
"""
specs = """
FILTER_FILE = string(default=None)
CONDITIONS = string_list(default=None)
"""
stage.Stage.__init__(self, specs=specs, *args, **kwargs)
self.jobdj = None
self.status = "NOT STARTED"
self.addExpectedInput(1, "structures", True)
self.addExpectedOutput(1, "structures", True)
[docs] def setupJobs(self):
"""
Sets up LigFilter jobs, which are distributed via JobDJ. There will
be one Ligfilter job for each input file. This method clears the
working directory of previous output and log files (if any). Raises
a RuntimeError if there is no FILTER_FILE and no CONDITIONS.
"""
filter_file = self['FILTER_FILE']
conds = self['CONDITIONS']
if not filter_file and not conds:
raise RuntimeError("No filtering conditions!")
if filter_file:
self.info("Using filter file: %s" % filter_file)
else: # CONDITIONS specified
for cond in conds:
cond = cond.strip("'").strip("\"")
self.info('Condition: "%s"' % cond)
self.jobdj = self.getJobDJ()
self.ligfilter_jobnames = []
self.input_files = self.getInput(1).getFiles()
self.expected_output_files = {} # key: subjobname
for filenum, infile in enumerate(self.input_files):
subjobname = self.genFileName(filenum=filenum + 1)
# Work-around a limitation in LigFilter where if input is
# SMILES-formatted, the output must also be:
if fileutils.get_structure_file_format(infile) == "smiles":
ext = ".smi"
else:
ext = ".maegz"
outfile = self.genOutputFileName(1,
filenum=filenum + 1,
extension=ext)
self.expected_output_files[subjobname] = outfile
logfile = subjobname + '.log'
# Remove the output/log files from previous run (if any):
if os.path.isfile(outfile):
os.remove(outfile)
if os.path.isfile(logfile):
os.remove(logfile)
if filter_file:
cmd = [
'ligfilter', '-f', filter_file, infile, '-o', outfile, '-j',
subjobname
]
else: # CONDITIONS specified
cmd = ['ligfilter', infile, '-o', outfile, '-j', subjobname]
for cond in conds:
cond = cond.strip("'").strip("\"")
cmd += ['-e', cond]
self.debug("CMD: %s" % subprocess.list2cmdline(cmd))
self.jobdj.addJob(cmd, jobname=subjobname)
self.ligfilter_jobnames.append(subjobname)
[docs] def processJobOutputs(self):
"""
Reports the number of
structures that passed the stage. Raises a RuntimeError if any
Ligfilter .log file is missing the summary information.
"""
filenum = 0
processed_ligs = []
for jobname in self.ligfilter_jobnames:
outfile = self.expected_output_files[jobname]
num_passed = self.num_passed[jobname]
if num_passed > 0:
filenum += 1
self.checkFile(outfile)
processed_ligs.append(outfile)
num_passed_total = sum(self.num_passed.values())
num_input_total = sum(self.num_input.values())
msg = "\nLigFilter Summary: %i of %i structures passed the filter" % (
num_passed_total, num_input_total)
self.info(msg)
if not processed_ligs:
self.error("\nNone of the structures have passed LigFilter!")
sys.exit(1)
self.setOutput(1, pipeio.Structures(processed_ligs, num_passed_total))
[docs] def operate(self):
"""
Perform an operation on the input files. There are setup, running,
and post-processing steps, and the stage records its current status so
that it can be restarted in that step if there is a failure. Raises
a RuntimeError if the JobDJ run() method fails, or if the stage
finishes with an improper status.
"""
if self.status == "NOT STARTED" or self.status == "SETTING UP JOBS":
self.status = "SETTING UP JOBS"
self.setupJobs()
self._job_outputs = []
# If restarting, go directly to next step.
self.status = "RUNNING JOBS"
self.dump()
if self.status == "RUNNING JOBS":
# Update JobDJ to correct options (may change if restarting):
self.setJobDJOptions(self.jobdj)
# Ev:64286 if restarting, do not restart jobs that have output
# files:
if self.jobdj.hasStarted():
self.info("Checking for completed subjobs...")
for i, job in enumerate(self.jobdj.all_jobs):
subjobname = self.ligfilter_jobnames[i]
logfile = subjobname + '.log'
outfile = self.expected_output_files[subjobname]
if not os.path.isfile(logfile):
if job.isComplete():
self.jobdj.markForRestart(job, 'missinglog')
print('FAILED:', subjobname)
continue
read_summary = False
num_passed = None
with open(logfile) as fh:
for line in fh:
if "Summary: " in line:
s = line.split()
num_passed = int(s[1])
read_summary = True
break
if not read_summary:
if job.isComplete():
self.jobdj.markForRestart(job, 'incompletelog')
some_failed = True
print('FAILED:', subjobname)
continue
if num_passed > 0 and not os.path.isfile(outfile):
if job.isComplete():
job.jobdj.markForRestart(job, "nooutput")
print('FAILED:', subjobname)
continue
# If got here then subjob is complete.
if not job.isComplete():
print(' Output exists for the job:', outfile)
print(' Marking %s as completed' % subjobname)
job._markFinished('found-output')
continue
self.dump()
self.info("Running LigFilter jobs...")
self.runJobDJ(self.jobdj)
self.info("Checking subjob outputs...")
self.num_input = {}
self.num_passed = {}
some_failed = False
#self.ligfilter_output_files = []
for i, job in enumerate(self.jobdj.all_jobs):
subjobname = self.ligfilter_jobnames[i]
logfile = subjobname + '.log'
outfile = self.expected_output_files[subjobname]
if not os.path.isfile(logfile):
job.jobdj.markForRestart(job, "nolog")
some_failed = True
print('FAILED:', subjobname)
continue
# The output file may be missing if no structures passed the
# filter
# Parse the LigFilter log file:
read_summary = False
num_passed = 0 # Stays zero if no Summary line in file
num_input = 0
with open(logfile) as fh:
for line in fh:
if "Summary: " in line:
s = line.split()
num_passed = int(s[1])
num_input = int(s[3])
read_summary = True
break
if not read_summary:
self.jobdj.markForRestart(job, 'incompletelog')
some_failed = True
print('FAILED:', subjobname)
continue
if num_passed > 0 and not os.path.isfile(outfile):
self.jobdj.markForRestart(job, "nooutput")
some_failed = True
print('FAILED:', subjobname)
continue
self.num_passed[subjobname] = num_passed
self.num_input[subjobname] = num_input
if some_failed:
self.dump()
self.exit("Some jobs failed. Try restarting. Exiting...")
self.status = "PROCESSING FILES"
self.dump()
if self.status == "PROCESSING FILES":
self.processJobOutputs()
self.status = "COMPLETE"
self.dump()
[docs]class SubSetStage(stage.Stage):
"""
Stage for making a subset of the input files based on a list of
titles or other property.
The keywords specific to this stage are:
PROPERTY Property to filter on (default s_m_title)
VALUE_FILE File containing a list of values to keep (one per line) if a
FILTER_FILE is specified.
The stage takes one set of input structure files and generates one set
of corresponding output files.
NOTE: SMILES format files can only be filtered on the title.
"""
[docs] def __init__(self, *args, **kwargs):
"""
See class docstring.
"""
specs = """
PROPERTY = string()
VALUE_FILE = string()
"""
stage.Stage.__init__(self, specs=specs, *args, **kwargs)
self.addExpectedInput(1, "structures", True)
self.addExpectedOutput(1, "structures", True)
self.jobdj = None
self.status = "NOT STARTED"
[docs] def filterSmilesFile(self, ligfile, outfile):
"""
Filter the file <ligfile> by title based on VALUE_FILE and add the
output file to self.output_files list
"""
if self.property != 's_m_title':
err = "SMILES files can only be subseted on the title"
raise RuntimeError(err)
outnum = 0
with open(ligfile) as infh, open(outfile, 'w') as outfh:
for line in infh:
self.num_input_sts += 1
# Print progress period:
if self.num_input_sts % 1000 == 0:
sys.stdout.write(".")
sys.stdout.flush()
s = line.split()
if len(s) > 1: # Title present
title = s[1]
else:
continue
if title in self.allowed_values:
outnum += 1
outfh.write(line)
self.num_output_sts += outnum
if outnum > 0:
self.output_files.append(outfile)
[docs] def filterFile(self, ligfile, outfile):
"""
Filter the file <ligfile> based on PROPERTY & VALUE_FILE and add the
output file to self.output_files list
"""
st_num = 0
outnum = 0
writer = structure.StructureWriter(outfile)
for st in pipeutils.get_reader(ligfile, astext=True):
st_num += 1
self.num_input_sts += 1
# Print progress period:
if self.num_input_sts % 1000 == 0:
sys.stdout.write(".")
sys.stdout.flush()
try:
value = st.property[self.property]
except KeyError:
err = "Error reading field %s for st %i (file %s)" % (
self.property, st_num, ligfile)
raise RuntimeError(err)
if value in self.allowed_values:
outnum += 1
writer.append(st)
writer.close()
self.num_output_sts += outnum
if outnum > 0:
self.output_files.append(outfile)
[docs] def readValueList(self):
self.property = self['PROPERTY']
value_file = self['VALUE_FILE']
self.allowed_values = []
if self.property.startswith('i_'):
conversion_func = int
elif self.property.startswith('r_'):
conversion_func = float
elif self.property.startswith('s_'):
conversion_func = str
else:
msg = 'Invalid property for SubSetStage: "%s"' % self.property
raise ValueError(msg)
# Generate a list of allowed values:
with open(value_file) as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
value = conversion_func(line)
except ValueError:
msg = "Invalid value (%s) for property %s" % (line,
self.property)
raise ValueError(msg)
self.allowed_values.append(value)
[docs] def operate(self):
"""
Perform the filtering operation on the input files.
"""
if self.status == "NOT STARTED":
self.readValueList() # Generate list of allowed values
self.input_files = self.getInput(1).getFiles()
self.curr_file = 1
self.output_files = []
self.num_input_sts = 0
self.num_output_sts = 0
# If restarting, go directly to next step.
self.status = "RUNNING JOBS"
self.dump()
if self.status == "RUNNING JOBS":
# Continue with the file that we stopped with (or start with file
# 1)
while self.curr_file <= len(self.input_files):
infile = self.input_files[self.curr_file - 1]
ext = fileutils.splitext(os.path.basename(infile))[1]
if ext == '.sdf.gz':
# Ev:93758
ext = '.sdf'
outfile = self.genOutputFileName(1,
filenum=self.curr_file,
extension=ext)
if ext in ['.smi', '.smiles']:
self.filterSmilesFile(infile, outfile)
else:
self.filterFile(infile, outfile)
# ^--- will add the output file to self.output_files
self.curr_file += 1
self.dump() # so that if restarting we pick up from next file
msg = "\nSubSet Summary: %i of %i structures were kept" % (
self.num_output_sts, self.num_input_sts)
self.info(msg)
if self.num_output_sts == 0:
self.error("\nSUBSET ERROR: None of the structures were kept!")
sys.exit(1)
self.setOutput(
1, pipeio.Structures(self.output_files, self.num_output_sts))
self.status = "COMPLETE"
self.dump()
[docs]class DrugLikeSplitStage(stage.Stage):
"""
Stage for splitting the input ligand structures into 3 groups:
Drug-like ligands: -dl.maegz
Course ligands: -co.maegz
Left-over ligands: -lo.maegz
Based on the specified criteria.
If at least one variant of a root matches all criteria for a drug-like
ligand, then all variants of that root are included in the dl.maegz output
file.
If none match, then if at least one variant matches all criteria for coarse
ligand, then all variants are included in the co.maegz file.
Otherwise all variants are included in the lo.maegz file.
All variants for the same root must be listed in blocks (next to each other)
in the input file and have the same title.
Also labels variants by setting s_vsw_variant field to <title>-#
Where # is a variant number (1 to n).
"""
[docs] def __init__(self, *args, **kwargs):
specs = """
UNIQUEFIELD = string(default="s_m_title")
"""
stage.Stage.__init__(self, specs=specs, *args, **kwargs)
self.status = "NOT STARTED"
self.addExpectedInput(1, "structures", True)
self.addExpectedOutput(1, "structures", False)
self.addExpectedOutput(2, "structures", False)
self.addExpectedOutput(3, "structures", False)
[docs] def operate(self):
""" Perform an operation on the input files. """
ligands = self.getInput(1).getFiles()
# Ev:98054 Print the filter criteria:
self.info("Structures will be filtered on the following criteria:")
self.info(" Drug-like criteria:")
for prop, c in druglike_criteria.items():
dl_min = c[1]
dl_max = c[2]
if dl_min and dl_max:
self.info(" %s: between %s and %s" % (prop, dl_min, dl_max))
elif dl_min:
self.info(" %s: more than %s" % (prop, dl_min))
elif dl_max:
self.info(" %s: less than %s" % (prop, dl_max))
self.info(
" Coarse criteria (for structures which do not match the above criteria):"
)
for prop, c in druglike_criteria.items():
co_min = c[0]
co_max = c[3]
if co_min and co_max:
self.info(" %s: between %s and %s" % (prop, co_min, co_max))
elif co_min:
self.info(" %s: more than %s" % (prop, co_min))
elif co_max:
self.info(" %s: less than %s" % (prop, co_max))
self.info(" All other structures are 'left-overs'")
self.info("")
self.dl_writer = structure.MultiFileStructureWriter(
self.getOutputName(1), '.maegz')
self.co_writer = structure.MultiFileStructureWriter(
self.getOutputName(2), '.maegz')
self.lo_writer = structure.MultiFileStructureWriter(
self.getOutputName(3), '.maegz')
self.num_dl_roots = 0
self.num_co_roots = 0
self.num_lo_roots = 0
self.num_input_roots = 0
self.num_input_sts = 0
uniquefield = self['UNIQUEFIELD']
self.info("Reading and splitting input files...")
# It is possible that some structures had properties missing:
self.missing_props = set()
current_root = -1
current_root_qualification = LO
root_sts = []
for ligfile in ligands:
self.info("\nReading file: %s" % ligfile)
for st in structure.StructureReader(ligfile):
self.num_input_sts += 1
if self.num_input_sts % 1000 == 0:
sys.stdout.write(".")
sys.stdout.flush()
if st.property[uniquefield] != current_root:
self.num_input_roots += 1
# New root:
if current_root != -1:
self.writeRoot(root_sts, current_root,
current_root_qualification)
current_root = st.property[uniquefield]
root_sts = []
current_root_qualification = LO
variant_qualification = self.qualify(st)
# print "variant qualification:", variant_qualification
if variant_qualification > current_root_qualification:
current_root_qualification = variant_qualification
root_sts.append(st)
# write sts for last root to file:
if current_root != -1:
self.writeRoot(root_sts, current_root, current_root_qualification)
self.dl_writer.close()
self.co_writer.close()
self.lo_writer.close()
dl_output_files = self.dl_writer.getFiles()
co_output_files = self.co_writer.getFiles()
lo_output_files = self.lo_writer.getFiles()
self.info('')
for prop in self.missing_props:
self.warning('ERROR some structures were missing property: %s' %
prop)
self.info("Number of structures processed: %i" % self.num_input_sts)
self.info(" compounds: %i" % self.num_input_roots)
dl_count = self.dl_writer.getNumStructures()
co_count = self.co_writer.getNumStructures()
lo_count = self.lo_writer.getNumStructures()
self.info("")
self.info('Number of drug-like structures: %i' % dl_count)
self.info(' compounds: %i' % self.num_dl_roots)
self.info('Number of coarse structures: %i' % co_count)
self.info(' compounds: %i' % self.num_co_roots)
self.info('Number of left-over structures: %i' % lo_count)
self.info(' compounds: %i' % self.num_lo_roots)
self.info('')
self.info('Total number of output structures: %i' %
(dl_count + co_count + lo_count))
self.info(' compounds: %i' %
(self.num_dl_roots + self.num_co_roots + self.num_lo_roots))
# Should never happen:
if not dl_output_files and not co_output_files and not lo_output_files:
self.error("NO OUTPUT FILES PRODUCED")
self.exit(1)
if dl_output_files:
self.setOutput(1, pipeio.Structures(dl_output_files, dl_count))
if co_output_files:
self.setOutput(2, pipeio.Structures(co_output_files, co_count))
if lo_output_files:
self.setOutput(3, pipeio.Structures(lo_output_files, lo_count))
[docs] def qualify(self, st):
"""
Returns module constant classification for the structure: Drug-like,
coarse, etc.
"""
qualifies_for_dl = True
for prop, c in druglike_criteria.items():
co_min = c[0]
dl_min = c[1]
dl_max = c[2]
co_max = c[3]
try:
value = st.property[prop]
except:
self.missing_props.add(prop)
# No QikProp properties
return LO
if co_max is not None and value > co_max:
return LO
if co_min is not None and value < co_min:
return LO
# if did not qualify for CO would not even go here
if qualifies_for_dl:
# If failed DL filter before, don't check again
if dl_max is not None and value > dl_max:
qualifies_for_dl = False
if dl_min is not None and value < dl_min:
qualifies_for_dl = False
if qualifies_for_dl:
return DL
else:
return CO
[docs] def writeRoot(self, root_sts, current_root, current_root_qualification):
for variant, st in enumerate(root_sts):
st.property['s_vsw_variant'] = "%s-%i" % (current_root, variant + 1)
if current_root_qualification == DL:
self.dl_writer.append(st)
elif current_root_qualification == CO:
self.co_writer.append(st)
else:
self.lo_writer.append(st)
if current_root_qualification == DL:
self.num_dl_roots += 1
elif current_root_qualification == CO:
self.num_co_roots += 1
else:
self.num_lo_roots += 1
[docs]class MergeDuplicatesStage(stage.Stage):
"""
Stage for removing duplicate variants within each root (compound) based
on unique SMILES strings.
NOTE: All variants of each root must be in consecutive order in the input.
"""
[docs] def __init__(self, *args, **kwargs):
specs = """
NEUTRALIZE = boolean(default=False) # Whether to neutralize the SMILES string before merging.
DESALT = boolean(default=False) # Whether to desalt the SMILES string before merging.
MERGE_PROPS = boolean(default=True) # Whether merge properties or to keep only the properties from the first-occuring structure. If merging, duplicate values are concatenated using commas.
OUTFORMAT = option("sdf", "canvascsv", "smiles", default="sdf") # Format to export the output to.
SMILES_FIELD = string(default="VendorSMILES") # Property name where to store the SMILES string
CODE_FIELD = string(default=None) # Property name where to store the unique compound code.
CODE_PREFIX = string(default=None) # String to prepend to the unique compound code.
REQUIRE_PROPS = boolean(default=False) # Whether to skip all structures that have no properties
"""
stage.Stage.__init__(self, specs=specs, *args, **kwargs)
self.addExpectedInput(1, "structures", True)
self.addExpectedOutput(1, "structures", True)
[docs] def operate(self):
""" Perform an operation on the input files. """
in1 = self.getInput(1)
input_files = in1.getFiles()
# FIXME if restarting call with -restart option.
jobname = self.genFileName()
exe = os.path.join(os.environ["SCHRODINGER"], "utilities",
"merge_duplicates")
cmd = [exe, '-j', jobname]
if self['NEUTRALIZE']:
cmd.append('-neutralize')
if self['DESALT']:
cmd.append('-desalt')
if not self['MERGE_PROPS']:
cmd.append('-nomerge')
if self['OUTFORMAT'] == 'canvascsv':
cmd.append('-csv')
elif self['OUTFORMAT'] == 'cmiles':
cmd.append('-smiles')
elif self['OUTFORMAT'] == 'sdf':
pass
else:
raise ValueError("Invalid value for OUTFORMAT option: '%s'" %
self['OUTFORMAT'])
if self['SMILES_FIELD']:
cmd += ['-smilesfield', self['SMILES_FIELD']]
# Will catch both None and "None"
if str(self['CODE_FIELD']) != "None":
cmd += ['-codefield', self['CODE_FIELD']]
# Will catch both None and "None"
if self['CODE_PREFIX'] and str(self['CODE_PREFIX']) != "None":
cmd += ['-codeprefix', self['CODE_PREFIX']]
if self['REQUIRE_PROPS']:
cmd.append('-requireprops')
# FIXME add for the latest verion of the script:
# cmd += ['-max_per_file', '1000000']
cmd += input_files
print('RUNNING:', subprocess.list2cmdline(cmd))
logfile = jobname + '-merge.log'
with open(logfile, 'w') as fh:
out = subprocess.call(cmd, stdout=fh, stderr=fh)
print('')
if out != 0:
print('merge_duplicates process has terminated')
raise RuntimeError(
"ERROR: $SCHRODINGER/utilities/merge_duplicates exited with error code: %s"
% out)
print('merge_duplicates process has completed successfully')
output_files = []
num_kept_structures = None
with open(logfile) as fh:
for line in fh:
if line.startswith("Output file: "):
outputfile = line.rstrip('\r\n')[13:]
output_files.append(outputfile)
elif line.startswith("Number of unique structures exported:"):
num_kept_structures = int(line.split(':')[1])
if not output_files:
raise RuntimeError(
"ERROR: $SCHRODINGER/utilities/merge_duplicates did not return any files"
)
print('Output files:', output_files)
print('Number of output structures:', num_kept_structures)
self.setOutput(1, pipeio.Structures(output_files, num_kept_structures))
[docs]class GeneratePropsStage(stage.Stage):
"""
Ev:85070
For each structure in the input set, runs:
$SCHRODINGER/run generate_ligfilter_properties.py, which generates
properties specified via the ligfilter.predefined_function_dict.
"""
[docs] def __init__(self, *args, **kwargs):
"""
See class docstring.
"""
specs = """
"""
stage.Stage.__init__(self, specs=specs, *args, **kwargs)
self.jobdj = None
self.status = "NOT STARTED"
self.addExpectedInput(1, "structures", True)
self.addExpectedOutput(1, "structures", True)
[docs] def setupJobs(self):
"""
Sets up subjobs, which are distributed via JobDJ. There will be one
subjob per each input file.
"""
self.jobdj = self.getJobDJ()
self.subjobnames = []
self.input_files = self.getInput(1).getFiles()
self.expected_output_files = {} # key: subjobname
for filenum, infile in enumerate(self.input_files):
subjobname = self.genFileName(filenum=filenum + 1)
outfile = self.genOutputFileName(1,
filenum=filenum + 1,
extension=".maegz")
self.expected_output_files[subjobname] = outfile
logfile = subjobname + '.log'
# Remove the output/log files from previous run (if any):
if os.path.isfile(outfile):
os.remove(outfile)
if os.path.isfile(logfile):
os.remove(logfile)
cmd = [
'run', 'generate_ligfilter_properties_startup.py', infile,
outfile, '-j', subjobname
]
self.debug("CMD: %s" % subprocess.list2cmdline(cmd))
self.jobdj.addJob(cmd, jobname=subjobname)
self.subjobnames.append(subjobname)
[docs] def processJobOutputs(self):
"""
Analyzes the output files from the subjobs.
Raises a RuntimeError if there are any errors with the subjobs.
"""
filenum = 0
output_files = []
num_processed = 0
for jobname in self.subjobnames:
outfile = self.expected_output_files[jobname]
num_processed += self.num_processed[jobname]
self.checkFile(outfile)
output_files.append(outfile)
msg = "\nAll subjobs have completed. LigFilter properties have been generated."
msg += "\nNumber of processed structures: %i" % num_processed
msg += "\nOutput files: %s" % output_files
self.info(msg)
if not output_files:
self.error("\nNo output files!")
sys.exit(1)
# FIXME report the number of output structures:
self.setOutput(1, pipeio.Structures(output_files, num_processed))
[docs] def operate(self):
"""
Perform an operation on the input files. There are setup, running,
and post-processing steps, and the stage records its current status so
that it can be restarted in that step if there is a failure. Raises
a RuntimeError if the JobDJ run() method fails, or if the stage
finishes with an improper status.
"""
if self.status == "NOT STARTED" or self.status == "SETTING UP JOBS":
self.status = "SETTING UP JOBS"
self.setupJobs()
self._job_outputs = []
# If restarting, go directly to next step.
self.status = "RUNNING JOBS"
self.dump()
if self.status == "RUNNING JOBS":
# Update JobDJ to correct options (may change if restarting):
self.setJobDJOptions(self.jobdj)
# Ev:64286 if restarting, do not restart jobs that have output
# files:
if self.jobdj.hasStarted():
self.info("Checking for completed subjobs...")
for i, job in enumerate(self.jobdj.all_jobs):
subjobname = self.subjobnames[i]
logfile = subjobname + '.log'
outfile = self.expected_output_files[subjobname]
if not os.path.isfile(logfile):
if job.isComplete():
self.jobdj.markForRestart(job, 'missinglog')
print('FAILED:', subjobname)
continue
job_complete = False
num_processed = None
with open(logfile) as fh:
for line in fh:
if line.startswith(
"Number of processed structures:"):
s = line.split(":")
num_processed = int(s[1])
job_complete = True
break
if not job_complete:
if job.isComplete():
self.jobdj.markForRestart(job, 'incompletelog')
some_failed = True
print('FAILED:', subjobname)
continue
if not os.path.isfile(outfile):
if job.isComplete():
self.jobdj.markForRestart(job, "nooutput")
print('FAILED:', subjobname)
continue
# If got here then subjob is complete.
if not job.isComplete():
print(' Output exists for the job:', outfile)
print(' Marking %s as completed' % subjobname)
job._markFinished('found-output')
continue
self.dump()
self.info("Running Generate LigFilter Properties subjobs...")
self.runJobDJ(self.jobdj)
self.info("Checking subjob outputs...")
self.num_processed = {}
some_failed = False
for i, job in enumerate(self.jobdj.all_jobs):
subjobname = self.subjobnames[i]
logfile = subjobname + '.log'
outfile = self.expected_output_files[subjobname]
if not os.path.isfile(logfile):
self.jobdj.markForRestart(job, "nolog")
some_failed = True
print('FAILED:', subjobname)
continue
# Parse the subjob log file:
job_complete = False
num_processed = 0 # Stays zero if no Summary line in file
with open(logfile) as fh:
for line in fh:
if line.startswith("Number of processed structures:"):
s = line.split(":")
num_processed = int(s[1])
job_complete = True
break
if not job_complete:
self.jobdj.markForRestart(job, 'incompletelog')
some_failed = True
print('FAILED:', subjobname)
continue
if not os.path.isfile(outfile):
self.jobdj.markForRestart(job, "nooutput")
some_failed = True
print('FAILED:', subjobname)
continue
self.num_processed[subjobname] = num_processed
if some_failed:
self.dump()
self.exit("Some jobs failed. Try restarting. Exiting...")
self.status = "PROCESSING FILES"
self.dump()
if self.status == "PROCESSING FILES":
self.processJobOutputs()
self.status = "COMPLETE"
self.dump()
[docs]class ChargeFilterStage(stage.Stage):
"""
Stage-based class for filtering a set of structure files by total charge.
MIN_CHARGE and MAX_CHARGE are the two keywords specific to this stage. If
a structure has a total charge within [MIN_CHARGE,MAX_CHARGE] (inclusive),
it is retained; otherwise, the structure is filtered out.
The stage takes one input structure file set and generates one set of
corresponding output structure files.
"""
[docs] def __init__(self, *args, **kwargs):
"""
See class docstring.
"""
stage.Stage.__init__(self, *args, **kwargs)
specs = """
MIN_CHARGE = integer(default=None)
MAX_CHARGE = integer(default=None)
"""
stage.Stage.__init__(self, specs=specs, *args, **kwargs)
self.addExpectedInput(1, "structures", True)
self.addExpectedOutput(1, "structures", True)
[docs] def operate(self):
"""
Read all the structures in the input files. If a structure's total
charge is between MIN_CHARGE and MAX_CHARGE, write it to a
the corresponding output file. Raises an IOError if there is a
problem reading an input file or writing an output file, and raises a
SystemExit if there are no output structures.
"""
input_files = self.getInput(1).getFiles()
min_charge = self['MIN_CHARGE']
max_charge = self['MAX_CHARGE']
msg = 'Retaining ligands whose charge falls within [%s, %s]' % \
(min_charge, max_charge)
self.debug(msg)
output_files = []
# FIXME: Generate ouly ONE output file (or up to 50K files)
num_passed = 0
filenum = 0
for infile in input_files:
self.info("Processing file %s..." % infile)
sts_in_file = 0
filenum += 1
outfile = self.genOutputFileName(1,
filenum=filenum,
extension='.maegz')
writer = structure.StructureWriter(outfile)
# FIXME: Add processing period printout
st_num = 0
for st in structure.StructureReader(infile):
st_num += 1
# Calculate the formal charge for the entire structure:
charge = st.formal_charge
msg = "File %i ligand %i charge: %s" % (filenum, st_num, charge)
self.debug(msg)
if min_charge is not None and charge < min_charge:
self.debug(' skipped')
continue # skip the ligand, and go to the next
if max_charge is not None and charge > max_charge:
self.debug(' skipped')
continue # skip the ligand, and go to the next
num_passed += 1
self.debug(' retained')
writer.append(st)
sts_in_file += 1
if sts_in_file: # If added anything to this file
output_files.append(outfile)
# Close last file:
writer.close()
if not output_files:
self.warning("None of the ligands have passed the Charge Filter!")
raise SystemExit
self.setOutput(1, pipeio.Structures(output_files, num_passed))
# EOF