A python pipeline workflow consists of multiple stages that that are run in serial or parallel manner (or a mixture of both). Each stage performs one specific operation on an input set (or sets) and returns one or more output sets. Each set is file-based and has one of the following types:
- Structures - a set of one or more structure file paths.
- Grid - a path to a grid file.
- Text - a path to one or more text files.
- PhaseDB - a path to a Phase database.
You can also define custom set types by overwriting the PipeIO class.
Examples of operations that can be performed by a stage:
- Filtering of ligands.
- Running a program (like LigPrep or Glide) on ligands.
- Modifying a Phase database.
- Converting ligands from one file format to another.
Each stage can optionality accept keywords. Keywords are based on the ConfigObj format but differ in that no equals signs are used to separate keyword-value pairs. Values can be of type integer, float, boolean, string. More complicated value types are also supported, see existing stages for examples and the ConfigObj validation functions for details.
There are many stages already written for tasks that are run by VSW, QPLD, CovalentDocking, and GeneratePhaseDB workflows. These can be found in the API documentation for the Pipeline stages package.
You can also write your own stage. The best way to get started it to read this manual and to look at some existing stages.
A stage can optionally have these features:
- The ability to run simultaneous subjobs. This will make your stage more complex, and requires the use of JobDJ.
- Restartability. Adding this feature will allow the stage to restart from the middle in case the user’s job fails.
However, we don’t recommend adding these features until the stage is functioning properly without parallelization and restartability.
Here is an example of a basic stage class:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | from schrodinger.pipeline import pipeio, stage
from schrodinger import structure # for StructureWriter class
from schrodinger.structutils import io # for MultiFileReader class
class ExampleStage( stage.Stage ):
def __init__(self, *args, **kwargs):
stage.Stage.__init__(self, *args, **kwargs)
specs="""
KEEP_STS = integer(default=10)
"""
stage.Stage.__init__(self, specs=specs, *args, **kwargs)
self.addExpectedInput(1, "structures", required=True)
self.addExpectedOutput(1, "structures", always=True)
def operate(self):
keep_sts = self['KEEP_STS']
# Read the input set:
input_files = self.getInput(1).getFiles()
# Prepare the output writer:
out_file = self.getOutputName(1) + ".maegz"
writer = structure.StructureWriter(out_file)
st_num = 0
for st in io.MultiFileReader(input_files):
writer.append(st)
st_num += 1
if st_num == keep_sts:
break
writer.close() # close the structure writer
self.info("Total %i structures kept" % st_num)
self.setOutput(1, pipeio.Structures([out_file], st_num))
|
This stage keeps the first KEEP_STS structures it reads, passing them on to the next stage or output file.
Going through this example by line numbers:
- Line 1: General imports for all stage modules.
- Lines 9-12: Define the keywords for the stage, following ConfigObj specification.
- Line 14: Define the only supported input set. This is input set number 1, of type “structures”, and is required.
- Line 15: Define the only output set. This is output set number 1, of type “structures,” and is always generated.
- Line 18: operate is the entry point into the stage’s code and is a required method for all stages.
- Line 19: Access the value for the keyword. This is defined in the pipeline input file.
- Line 22: Get the list of file paths from the input set. These files are either specified in the pipeline input files, or are passed from a preceding stage.
- Line 25: Determine what the user would like to call the output files (from the pipeline input file).
- Line 26: Open a structure writer for writing the output file.
- Line 29: Read the input files (there may be more than one)
- Line 38: Each stage has self.info(), self.warning(), self.error(), and self.debug() methods for printing various messages to the log files.
- Line 40: Set the output set to a list of output files (in this case only one file). Optionally the number of items in this set can be specified (in this case, the number of structures).
Here is a sample input file that shows how the stage can be used:
[ SET:ORIGINAL_LIGANDS ]
VARCLASS Structures
FILES /Users/adzhigir/50ligs.maegz
[ STAGE:EXAMPLE_STAGE ]
STAGECLASS example.ExampleStage
INPUTS ORIGINAL_LIGANDS
OUTPUTS FILTERED_LIGANDS
KEEP_STS 20
[ USEROUTS ]
USEROUTS FILTERED_LIGANDS
STRUCTOUT FILTERED_LIGANDS
See the Python Pipeline Manual for more info on the workflow input file format.
Here is a sample log file from this stage:
Stage test-example-EXAMPLE_STAGE initializing...
Running stage: test-example-EXAMPLE_STAGE
SCHRODINGER: /software/schro2012
PYTHONPATH: None
Job ID: pc-16-21-111-0-4ed6bf94
Time: Wed Nov 30 15:43:17 2011
Stage started successfully
Python version: 0
Total 20 structures kept
STAGE COMPLETED. Elapsed time: 0 seconds
Output #1 (test-example-FILTERED_LIGANDS) [structures(20)]:
test-example-EXAMPLE_STAGE/test-example-FILTERED_LIGANDS.maegz
The jobcontrol module contains four major sections: job database interaction, job launching, job backend interaction, and job hosts. The job database section deals with getting info about existing jobs, the job launching section deals with starting up a subjob, and the job backend section provides utilities for a python script running as a job.
To run a LigPrep job to convert a SMILES input file to 3D:
import schrodinger.job.jobcontrol as jc
ligprep_output = options.jobname+".mae"
run_ligprep = [os.path.join(os.environ['SCHRODINGER'], 'ligprep')]
options = ["-i", input_smiles_file, "-o", output_3d_mae_file]
command = run_ligprep + options
job = jc.launch_job(command)
print ("LigPrep job status: %s" % job.Status)
job.wait()
print ("LigPrep job status: %s" % job.Status)