"""
Base classes for Pipeline stages.
To create a stage instance::
stageobj = YourStage(<stagename>)
# Add keywords:
stageobj['NUM_RINGS'] = 2
stageobj['UNIQUEFIELD'] = 's_m_title'
# Add input objects:
# ligand_files is list of file names
ligandsobj = pipeio.Structures(ligand_files)
stageobj.setInput(1, 'INPUT1', ligandsobj)
# Tell the stage where to save it's output:
stageobj.setOutputName(1, 'OUTPUT')
# Integers (1) are position numbers.
# INPUT1 is the name of the input object
# Run the stage:
outputs = stageobj.run()
# outputs is now a dictionary of output objects
Copyright Schrodinger, LLC. All rights reserved.
"""
import logging
import os
import pickle
import shutil
import sys
import time
from collections import UserDict
import schrodinger.job.queue as queue
import schrodinger.utils.log as log
from schrodinger import __version__
from schrodinger.application.inputconfig import InputConfig
from schrodinger.job import jobcontrol
from schrodinger.job import util
from schrodinger.utils import mmutil
logger = log.get_logger("schrodinger.pipeline.stage")
logger.setLevel(logging.INFO)
logger.propagate = False # Don't duplicate messages upward
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
# Check whether SCHRODINGER_PYTHON_DEBUG is set for debugging:
DEBUG = (log.get_environ_log_level() <= log.DEBUG)
if DEBUG:
logger.setLevel(logging.DEBUG)
USING_JOB_SERVER = mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER)
[docs]def get_time(t1, t2=0):
"""
Returns a string of the time difference between t2 and t1 in
human readable format.
t2 and t1 are times, as returned by time.time()
"""
timeString = ""
if t2 == 0:
time = t1
else:
time = int(t2) - int(t1)
minutes = time // 60
hours = minutes // 60
seconds = time % 60
minutes = minutes % 60
if hours == 1:
timeString = timeString + "1 hour "
elif hours > 1:
timeString = timeString + str(hours) + " hours "
if minutes == 1:
timeString = timeString + "1 minute "
elif minutes > 1 or hours:
timeString = timeString + str(minutes) + " minutes "
if seconds == 1:
timeString = timeString + "1 second"
elif seconds > 1 or t2 != 0:
timeString = timeString + str(seconds) + " seconds"
return timeString
def _host_is_queue(hostname):
"""
Return True if specified host is a queue
"""
hostobj = jobcontrol.get_host(hostname)
if hostobj:
return hostobj.isQueue()
else:
return False
[docs]class Stage(UserDict):
[docs] def __init__(self,
stagename,
specs=None,
allow_extra_keywords=False,
cleanup=True,
inpipeline=False,
driver_dir=None):
"""
This is the Stage class. Derive your own class from it.
:param stagename: full name for this stage (<jobname>-<stagename>)
:param specs: ConfigObj specification for the supported keywords
:param allow_extra_keywords: Whether to allow keywords that are not in
the specification.
:param cleanup: Whether to remove intermediate files
:param inpipeline: Whether the state is running within a Python
Pipeline. If the stage is manually created, do NOT
set this flag. Python Pipeline will set it as needed.
:param driver_dir: Directory in which the driver is running.
"""
self.specs = specs
# If a string is specified, split it into a list:
if isinstance(specs, str):
self.specs = [ln for ln in specs.split('\n')]
# Base this class in UserDict instead of InputConfig,
# because InputConfig can not be pickled safely (Ev:87429):
#
# NOTE: As of version 4.6.0 of configobj (suite2011), InputConfig can now be pickled.
# not removing this work-around for now (if it ain't broken, don't fix
# it).
self.allow_extra_keywords = allow_extra_keywords
UserDict.__init__(self)
self.stagename = stagename
self._running_in_pipeline = inpipeline
self._driver_dir = driver_dir
self._expected_inputs = {}
self._expected_outputs = {}
self._required_products = set()
self._input_objects = {}
self._output_objects = {}
self._input_names = {} # Variable name of input at each position
self._output_names = {} # Variable name of output at each position
self._restart_file = None
self._idle_function = None # NOT USED?
self._has_started = False
self._has_completed = False
# subjob options: Modify using setJobOptions() method
self._subjob_hosts_list = [('localhost', 1)]
# None means the number of subjobs is not specified
self._target_njobs = None
self._adjust_njobs = None # default
self._subjob_max_retries = None # Number of restarts
self._cleanup_requested = cleanup
self._main_product = None # Used to tell which host list to use
# For updateJobdj():
self._needed_cpus = None
self._used_hosts = None
self._last_message_time = None
[docs] def validateValues(self, preserve_errors=False):
"""
Validates the stored keywords. This is done by converting <self> to
a ConfigObj instance, and calling validate() on it. The validated
keywords are then updated back to <self>.
This is done as part of Ev:87429
"""
ic = InputConfig(dict(self), specs=self.specs)
ic.validateValues(preserve_errors=preserve_errors)
# Ev:101378 Exit with an error if unsupported keywords were specified:
if not self.allow_extra_keywords:
import configobj
extra_keywords = []
for section, keyword in configobj.get_extra_values(ic):
extra_keywords.append(keyword)
if extra_keywords:
raise RuntimeError(
"ERROR: The following unsupported keywords were specified for this stage: %s"
% ", ".join(extra_keywords))
self.update(dict(ic))
# FIXME: Implement getJobDJOptions() method which will return verbosity,
# etc.
[docs] def getVerbosity(self):
""" Return verbosity of thos stage (for JobDJ) """
global logger
level = logger.level
if level == log.WARNING:
return 'quiet'
elif level == log.DEBUG:
return 'verbose'
else:
return 'normal'
[docs] def mainProduct(self):
"""
If a stage has a main product associated with it, the stage should
overwrite this method with a method that returns the product string.
For example, the LigPrepStage.mainProduct() will return "ligprep"
Used by Pipeline.
"""
return self._main_product
[docs] def setMainProduct(self, product):
"""
Specify which product this stage is part of. Will determine which host
the subjobs are run on.
"""
self._main_product = product
[docs] def addExpectedOutput(self, position, type, always=True):
"""
A stage can return one or more pipeio objects.
Use this method to specify the type of object that will be returned
and whether or not it will always be produced by the stage.
position - an integer starting at 1.
type - structures/grids/etc.
always - whether this output is always produced
"""
self._expected_outputs[position] = {'type': type, 'always': always}
def __getitem__(self, keyname):
"""
Returns the value for specified keyword, or default value.
Use as follows::
precision_value = stageobj['PRECISION']
Get values for keywords this way from stage's operate() to determine
how to run. If the user did not specify a value for the keyword,
default value is returned.
Raises KeyError if a keyword is not supported by the stage.
"""
try:
value = UserDict.__getitem__(self, keyname)
except KeyError:
raise
# Files need to be treated specially (need run-time path):
if keyname.endswith('_FILE') and value:
# Return runtime path of the file
value = self.getRuntimePath(value)
return value
[docs] def getStageDirectory(self):
""" Return the directory in which the stage is running """
return os.curdir
[docs] def requiredProduct(self, product):
"""
Specify a product that is required for this stage to run;
optionally minimum version.
Example: product="mmshare"
"""
self._required_products.add(product)
def _productPresent(self, product):
""" Internal: return True if product installed"""
hunt_result = util.hunt(product)
if hunt_result:
return True
return False
[docs] def checkProducts(self):
"""
Raises RuntimeError if any of the required products are not installed
or the version installed is less that minimum required version.
It is possible to override this method. See ligprep.py for example.
"""
missing_products = []
for product in self._required_products:
if not self._productPresent(product):
missing_products.append(product)
if missing_products:
msg = "ERROR: The following products are not installed: "
msg += ', '.join(missing_products)
raise RuntimeError(msg)
[docs] def getRuntimePath(self, filename):
"""
Return the runtime-path of a file that user specified
Prints an error and exits if file does not exist.
"""
self.debug("Determining runtime path of: %s" % filename)
runtime_filename = jobcontrol.get_runtime_path(filename)
if not os.path.exists(runtime_filename):
msg = "ERROR: File does not exist: %s" % runtime_filename
self.exit(msg)
self.debug("Runtime path: %s" % runtime_filename)
return runtime_filename
[docs] def requiredProductRuntime(self, product):
"""
Similar to requiredProduct() but can be used to specify required
products at runtime. For example, ConvertStage doesn't know
what products are required for conversion until runtime.
Raises RuntimeError if product is not installed.
"""
if not self._productPresent(product):
msg = "ERROR: The following product is not installed: %s" % product
raise RuntimeError(msg)
[docs] def outputRequested(self, position):
"""
Returns True if the user requested optional output at <position>
"""
if position not in self._expected_outputs:
msg = "ERROR: Output position not expected: %s" % position
raise RuntimeError(msg)
return position in self._output_names
[docs] def setOutput(self, position, obj):
"""
Use this method at the end of operate() to set the output.
"""
if position not in self._expected_outputs:
msg = "ERROR: Output position not expected: %s" % position
raise RuntimeError(msg)
if position not in self._output_names:
msg = "WARNING: User did not request output at position: %s" % position
print(msg)
obj.check() # Will kill script if any files are missing
self._output_objects[position] = obj
[docs] def getOutput(self, position):
"""
Returns the output IO object of the stage at specified position.
Use this method after running the stage to get its output objects
"""
return self._output_objects[position]
def _inputCheck(self, position, input):
"""
Makes sure that position is valid, that the specified input
object is OK for the position.
If Input is required for this position, input.check() is run.
"""
if position not in list(self._expected_inputs):
raise RuntimeError("Error: Position %i not defined." % position)
expected_type = self._expected_inputs[position]['type']
if input.type != expected_type:
raise RuntimeError(
"Error: Input Position %i Type '%s' does not match expected type '%s'!"
% (position, input.type, expected_type))
if self._expected_inputs[position]['required']:
input.check()
def __getstate__(self):
"""
This method is called by the pickle module to return the state to be
serialized by dump().
Since Job objects and callable functions can not be pickled, it
removes them from the a copy of the instance __dict__ attribute.
_backend needs to be removed so that mmjobbe_terminate() would not
be called more than once when the job exits.
"""
state_dict = dict(self.__dict__)
state_dict['_idle_function'] = None
state_dict['_driver_jobid'] = None
state_dict['_stage_job'] = None
state_dict['_backend'] = None
return state_dict
[docs] def dump(self):
"""
This method dumps all the variables of the Stage to a restart file.
Run it every time an important step is performed.
"""
if self._restart_file:
fh = open(self._restart_file, "wb")
pickle.dump(self, fh, protocol=2)
fh.close()
[docs] def initNonPersistent(self, pipeline):
"""
Gets called after the Stage joins pipeline. Meant to
be used to initialize non-persistent context.
"""
pass
[docs] def checkFile(self, file, error="File does not exist:"):
"""
Raise exception if specified file does not exist.
The message that is printed can be specified.
"""
if not os.path.exists(file):
msg = '%s %s' % (error, file)
self.warning(msg)
raise RuntimeError(msg)
[docs] def checkFiles(self, files, error="File does not exist"):
""" Raise expetion if any file does not exist. """
for f in files:
if not os.path.exists(f):
msg = '%s %s' % (error, f)
self.warning(msg)
raise RuntimeError(msg)
[docs] def lognoret(self, *args):
""" Prints specified objects to the stage log file. No EOF return """
for a in args:
print(a, end=' ')
sys.stdout.flush()
[docs] def info(self, text):
""" Print an info line to the log file """
logger.info(text)
sys.stdout.flush() # update the log file
[docs] def debug(self, text):
""" Print a debug line to the log file """
logger.debug(text)
sys.stdout.flush() # update the log file
[docs] def warning(self, text):
""" Print a warning line to the log file """
logger.warning(text)
sys.stdout.flush() # update the log file
[docs] def error(self, text):
""" Print an error line to the log file """
logger.error(text)
sys.stdout.flush() # update the log file
[docs] def exit(self, text=""):
""" Print an error line to the log file and exit with code 1 """
raise RuntimeError(text)
[docs] def reportParameters(self, fh=None):
"""
Print the value of each keyword for this stage to the stream
specified as <fh>. Used by Pipeline
"""
if not fh:
fh = sys.stdout
fh.write(" Stage: %s\n" % self.stagename)
for key in self.keys():
value = self[key]
if value is not None and value != []:
fh.write(" %s: %s\n" % (key, value))
fh.flush()
[docs] def checkParameters(self):
""" OVERWRITE: Make sure that all parameters are valid. """
[docs] def hasStarted(self):
""" Returns True if this stage has started. """
return self._has_started
[docs] def hasCompleted(self):
""" Returns True if this stage's operate() exited successfully. """
return self._has_completed
[docs] def getName(self):
""" Return stagename (jobname of the stage) """
return self.stagename
[docs] def setOutputName(self, position, varname):
"""
Is called by Pipeline when starting the stage.
Tell the stage what name to save each output under.
"""
# if not position in self._expected_outputs.keys():
# raise KeyError("Stage %s does not expect output #%i!" %
# (self.stagename, position) )
self._output_names[position] = varname
[docs] def getOutputNames(self):
""" Return a list of output names for each position (dict) """
return self._output_names
[docs] def getOutputName(self, position):
""" Return the output name for specified position """
if position in self._output_names:
return self._output_names[position]
else: # Output at this position was not requested
return 'OUTPUT%i' % position
[docs] def setJobOptions(self,
subjob_hosts=None,
njobs=None,
adjust=None,
force=None,
max_retries=None,
cleanup=None):
"""
Tell this stage how to run the subjobs
:param subjob_hosts: list of hosts to run subjobs on
:param njobs: number of subjobs to generate. None means determine
automatically.
:param adjust: whether to adjust njobs such that job size is within
limits
:param force: whether to continue with job if subjobs fail
:param max_retries: number of times to attempt to restart a subjob
If not specified, use SCHRODINGER_MAX_RETRIES or 2.
:param cleanup: whether to delete intermediate files
"""
if subjob_hosts is not None:
if not isinstance(subjob_hosts, type([])):
print('INVALID HOSTS:', subjob_hosts)
raise RuntimeError(
"Stage.setJobOption() hosts must be given as a list")
self._subjob_hosts_list = subjob_hosts[:]
else:
# Run subjobs on localhost:
self._subjob_hosts_list = [('localhost', 1)]
if njobs is not None:
self._target_njobs = int(njobs)
if adjust is not None:
self._adjust_njobs = bool(adjust)
if force is not None:
self._force_jobs = bool(force)
if max_retries is not None:
self._subjob_max_retries = int(max_retries)
if cleanup is not None:
self._cleanup_requested = bool(cleanup)
[docs] def getHostList(self):
"""
Returns a list of hosts to run the subjobs on. localhost:1 may be
in the list as well. Ideally, pass the output to JobDJ.
Format: [ (host1,ncpus), (host2,ncpus) ]
Pass this value to JobDJ.
"""
return self._subjob_hosts_list
[docs] def getHostStr(self):
"""
Just like getHostList() but instead of returning a list, returns a
host string to be passed to the -HOST argument.
"""
return jobcontrol.host_list_to_str(self.getHostList())
[docs] def areSubjobsAndStageOnSameHost(self):
"""
Return True if subjobs are running on the same host as the driver
and the stages. This is always the case with JOB_SERVER but not with
classic Job Control.
"""
if self.getHostList()[0][0] == "localhost":
# If subjobs are running on localhost, so is the driver
return True
if jobcontrol.get_backend_host_list() is None:
# Driver is running on localhost
return False
# Driver and subjobs are running on remote host.
return True
[docs] def getNJobs(self):
"""
Returns the requested target number of subjobs, and whether or
not to adjust that number if it is unreasonable.
If -NJOBS was not specified, the # of CPUs or 10 is returned (whichever
is smaller).
Used by Glide DockingStage and _adjustNJobs()
"""
# If specific number of subjobs was specified, always use that:
if self._target_njobs:
if self._adjust_njobs:
return (self._target_njobs, True)
else:
return (self._target_njobs, False)
all_hosts_had_cpus = True
total_cpus = 0
for (hostname, ncpus) in self._subjob_hosts_list:
if ncpus is None:
all_hosts_had_cpus = False
if _host_is_queue(hostname):
# "unlimited" CPUs - queue host. Use 10 as an arbitrary number
ncpus = 10
else:
ncpus = 1 # Non-queue host specified without CPU
total_cpus += ncpus
# Ev:125618 If a specific number of CPUs was specified, use that:
if all_hosts_had_cpus:
if self._adjust_njobs:
return (total_cpus, True)
else: # do not adjust by default - use #cpus
return (total_cpus, False)
# If we got here then neither # of subjobs nor # of CPUs was specified
# (adjust the number of subjobs, starting with 10)
return (10, True)
def _calcStsPerJob(self, total_mol, njobs):
""" Determine the number of structure that should go into each subjob """
# FIXME Use math.ceil() for simplicity?
sts_per_file = int((total_mol / njobs) + 0.6)
# Prevent creation of extra tiny subjob at end:
while sts_per_file * njobs < total_mol:
sts_per_file += 1
return sts_per_file
def _adjustNJobs(self, total_mol, min_job_size, max_job_size):
"""
Adjusts the specified number of jobs so that the job size ranges
between specificed min and max. Also calculates the number of ligands
that will be in each subjob.
"""
# Do not separate tiny jobs:
if total_mol <= min_job_size:
return (total_mol, 1)
max_njobs = 999
max_ligands_per_file = 100000 # Roughly 1.5 GB
# Limit the number of ligands per subjob:
if max_job_size > max_ligands_per_file:
max_job_size = max_ligands_per_file
(njobs, adjust) = self.getNJobs()
# Determine the initial njobs value:
if self._target_njobs: # -NJOBS specified
up_incremental = 1
else: # No -NJOBS specified
total_cpus = self.getNCpus()
# Ev:56080 split into multiples of ncpus
up_incremental = total_cpus
# If -NJOBS is not specified, then the number of subjobs will start with # of cpus,
# and will be adjusted UP in #cpu increments, and adjusted DOWN by
# 1.
# Calculate sts_per_file for initial njobs value:
sts_per_file = self._calcStsPerJob(total_mol, njobs)
# Make sure size of job is more than min_job_size:
# Only run when -NJOBS specified was too big:
while sts_per_file < min_job_size:
njobs -= 1
sts_per_file = self._calcStsPerJob(total_mol, njobs)
if njobs == 1:
break # Can't have fewer than 1 number of jobs
# Make sure size of job is less than max_job_size:
while sts_per_file > max_job_size:
njobs += up_incremental
sts_per_file = self._calcStsPerJob(total_mol, njobs)
if njobs == max_njobs:
break # Can't have more than max_njobs number of jobs
return (sts_per_file, njobs)
[docs] def getAdjustedNJobs(self, total_mol, min_job_size, max_job_size):
"""
Returns the desired number of subjobs, and adjusts it for the the
specified min & max job sizes if the user specified ADJUST option.
If the number of desired jobs was specified by the user, the number
of available cpus is used or 10, whichever is smaller.
Specify the number of input ligands and the smallest and largest
desired job sizes (Generally job lengths of 1 minute & 24 hours).
"""
(njobs, adjust) = self.getNJobs()
if not adjust:
# No adjustment, create -NJOBS number of jobs (or CPUs):
n_st_per_file = self._calcStsPerJob(total_mol, njobs)
return (n_st_per_file, njobs, False)
(n_st_per_file, njobs) = self._adjustNJobs(total_mol, min_job_size,
max_job_size)
adjusted = (njobs != self._target_njobs)
return (n_st_per_file, njobs, adjusted)
[docs] def JobDJOptions(self):
"""
Returns a dictionary of options to pass to JobDJ:
hosts, max_retries, default_max_retries, verbosity
"""
kw = {}
if self._driver_jobid:
# If running under a driver job, use processor-check-out mechanism.
kw['hosts'] = []
else:
# If running only one stage, use one CPU:
kw['hosts'] = [('localhost', 1)]
kw['max_retries'] = self.getMaxRetries()
kw['max_failures'] = queue.NOLIMIT # Do not exit when subjobs fail
kw['verbosity'] = self.getVerbosity()
return kw
[docs] def setJobDJOptions(self, jobdj):
"""
Use this method to adjust the specified queue.JobDJ instance
to the VSW settings.
"""
jobdj.max_retries = self.getMaxRetries()
jobdj.max_failures = queue.NOLIMIT
jobdj._verbosity = self.getVerbosity()
if self._driver_jobid:
# If running under a driver job, use processor-check-out mechanism.
host_list = []
else:
# If running only one stage, use one CPU:
host_list = [('localhost', 1)]
jobdj.setHostList(host_list)
jobdj._ignore_host_resource_checking = True
[docs] def getJobDJ(self, **kwargs):
"""
Returns a pre-set JobDJ instance for the stage to use.
It already has it's hosts, max_retries, max_failures,
default_max_retries, and verbosity set.
"""
useargs = self.JobDJOptions()
# Stage may overwrite:
useargs.update(kwargs)
j = queue.JobDJ(**useargs)
return j
[docs] def getMaxRetries(self):
"""
Return the number of max restarts to use.
If -max_retries is specified, returns that value; otherwise
if SCHRODINGER_MAX_RETRIES is defined, returns that value;
otherwise returns default of 2.
Pass this value to JobDJ.
"""
# User did not specify -max_retires:
if self._subjob_max_retries is None:
env_max_retries = os.getenv('SCHRODINGER_MAX_RETRIES')
if env_max_retries:
return int(env_max_retries)
else: # env var not defined, use 2.
return 2
else:
return self._subjob_max_retries
[docs] def getNCpus(self):
"""
Returns the total number of processors specified in the host string.
For queued hosts with no CPU# specification, 10 is added.
"""
total_cpus = 0
for (hostname, ncpus) in self._subjob_hosts_list:
if ncpus is None:
if _host_is_queue(hostname):
# "unlimited" CPUs - queue host. Use 10 as an arbitrary number
ncpus = 10
else:
ncpus = 1 # Non-queue host specified without CPU
total_cpus += ncpus
return total_cpus
[docs] def getCleanupRequested(self):
""" Stages should clean up after themselves if this returns True """
return self._cleanup_requested
def _readMessageFromDriver(self):
"""
Read the LAST message that the pipeline sent to this stage.
This message will tell this stage how many processors to use.
"""
if not self._driver_jobid:
return None # No messages to read
last_message = None
while True:
message = self._backend.nextMessage()
if not message:
return last_message
last_message = message
# Return the last message (or None):
return message
[docs] def updateJobdj(self, jobdj):
"""
Gets called periodically in order to update JobDJ's hosts.
Will ask Pipeline for CPUS when needed, and will tell Pipeline
when they are no longer needed.
"""
if not self._driver_jobid:
# No messages to read if not running under a Pipeline driver.
return
# Analyze queue.JobDJ:
# FIXME these attributes are private:
num_active = len(jobdj._running)
num_completed = len(jobdj._finished)
num_waiting = jobdj.total_added - num_active - num_completed
old_needed_cpus = self._needed_cpus
old_hosts = self._used_hosts
self._used_hosts = jobdj.getActiveProcCounts()
self._needed_cpus = num_waiting + num_active
# Send a message to driver if the number of needed cpus has
# changed (number of active+waiting jobs has decreased):
# OR we now need LESS cpus:
send_message = False
if self._used_hosts != old_hosts:
send_message = True
elif self._needed_cpus != old_needed_cpus:
send_message = True
if send_message:
# Send a message to the driver stating 2 things:
# 1. The hosts that we are currently using AND
# 2. The number of cpus that we need (total)
used_hosts_list = [
(host, ncpus) for host, ncpus in self._used_hosts.items()
]
used_hosts_str = jobcontrol.host_list_to_str(used_hosts_list)
jobid = self._stage_job.JobId
msg = "pipeline %s %i %s" % (jobid, self._needed_cpus,
used_hosts_str)
self.debug('DEBUG SENDING MESSAGE: "%s"' % msg)
try:
self._backend.sendMessageToParent(msg)
except Exception as err:
print('FAILED TO SEND MESSAGE TO DRIVER')
print('MESSAGE: "%s"' % msg)
print('EXCEPTION:', str(err))
sys.stdout.flush() # Update the log file
self._last_message_time = time.time()
msg = 'Requested %s total cpu(s); and said that we are using: %s' % (
self._needed_cpus, used_hosts_str)
self.debug(msg)
message = self._readMessageFromDriver()
if message:
s = message.split(None, 2) # Make 2 cuts
if len(s) == 2: # No host list specified
hosts_to_use = []
else:
hosts_to_use = jobcontrol.host_str_to_list(s[2])
self.debug(' Have been told to use hosts: %s' % hosts_to_use)
jobdj.setHostList(hosts_to_use)
self._last_message_time = None
sys.stdout.flush()
[docs] def runJobDJ(self, jobdj):
def job_status_change(job):
self.updateJobdj(jobdj)
if job is not None:
# Status changed:
self.dump()
def periodic_callback():
self.updateJobdj(jobdj)
jobdj.run(status_change_callback=job_status_change,
callback_interval=1,
periodic_callback=periodic_callback)
[docs] def run(self,
idle_function=None,
restart_file=None,
verbosity=None,
logfh=None):
"""
Run the stage.
idle_function - function to call when idle
restart_file - file to periodically dump this instance to
verbosity - there are three verbosity levels: "quiet", "normal", and "verbose"
"quiet" - only warnings and errors are printed
"normal" - stage progress is printed - default
"verbose" - additional debugging info is printed
logfh - where to send the loggin output
"""
global logger
global handler
if verbosity is not None:
if verbosity == 'quiet':
logger.setLevel(log.WARNING)
elif verbosity == 'normal':
logger.setLevel(log.INFO)
elif verbosity == 'verbose':
logger.setLevel(log.DEBUG)
if logfh is not None:
# Setup logger to log to logfh:
logger.removeHandler(handler)
handler = logging.StreamHandler(logfh)
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
self.debug('SCHRODINGER_HOSTS:%s' % os.environ.get('SCHRODINGER_HOSTS'))
if restart_file:
self._restart_file = restart_file
self.debug("Will periodically dump this stage to: %s" %
self._restart_file)
# Set jobHost and masterJobID to appropriate values:
self._backend = jobcontrol.get_backend()
# Get the job object for the driver job (for messaging):
self._driver_jobid = None
self._stage_job = None
if self._backend: # stage under jobcontrol (unit tests don't)
self._stage_job = self._backend.getJob()
if self._running_in_pipeline and hasattr(self._stage_job,
'ParentJobId'):
# If running within Python Pipeline AND the driver was NOT run with -NOJOBID
# FIXME perhaps a better check is needed?
# NOTE: We do NOT have access to the driver job record!
# See Ev:130818 and Ev:132483
# Allows implementation of Ev:105812 (running stage outside of Pipeline)
# Listen to Pipeline messages only when running in a Python
# Pipeline workflow:
self._driver_jobid = self._stage_job.ParentJobId
self.debug('Will listen to "pipeline" messages from the driver')
self._backend.addMessageName('pipeline')
totalTimeStr = 'NA'
self._idle_function = idle_function
if not self._has_started:
if not self._input_objects:
if self._expected_inputs:
raise RuntimeError("NO INPUT OBJECTS FOUND!")
if not self._output_names:
if self._expected_outputs: # FIXME allow optional outputs
raise RuntimeError("NO OUTPUT DESTINATIONS FOUND!")
self.info("\nRunning stage: %s" % self.stagename)
sys.stdout.flush()
else: # Restarting...
if self.hasCompleted():
self.info('STAGE %s IS COMPLETE' % self.stagename)
else:
self.info('RESTARTING STAGE %s' % self.stagename)
# Ev:98326 This will force the stage to request the CPUs again:
self._needed_cpus = None
if not self.hasCompleted():
# Starting from beginning or restarting and not complete:
self._has_started = True
startTime = time.time()
self.debug("Running in debug mode")
self.info("SCHRODINGER: %s" % os.environ.get('SCHRODINGER'))
self.info("PYTHONPATH: %s" % os.environ.get('PYTHONPATH'))
if self._backend: # Stage running under jobcontrol (not unit test)
# Print jobid of the stage:
self.info("Job ID: %s" % self._backend.job_id)
self.info("Time: %s" % time.asctime())
# Convert keywords to desired format and
# set unspecified keywords to defaults:
if self.specs:
try:
self.validateValues(preserve_errors=False)
except Exception:
# self.exit("Failed to validate input keywords")
raise
self.checkProducts() # Make sure products are installed
self.info('Stage started successfully\n')
# Ev:99539
self.info("Python version: %s" % __version__)
self.operate() # Run the stage
totalTimeStr = get_time(startTime, time.time())
if not self._output_objects:
if self._expected_outputs: # FIXME and not optional
msg = 'Use setOutput() at the end of Stage.operate()'
print(msg)
raise RuntimeError(msg)
self._has_completed = True
self.dump()
self.info("\nSTAGE COMPLETED. Elapsed time: " + totalTimeStr + "\n")
self.dump()
# Otherwise the object would not be pickable.
self._idle_function = None
# Stage has completed.
# Tell jobcontrol to copy the output files:
out_files = []
for position, obj in self._output_objects.items():
obj.check()
if position in self._output_names: # User requested this output
varname = ' (%s)' % self._output_names[position]
else:
varname = ''
msg = " Output #%i%s " % (position, varname)
msg += str(obj)
self.info(msg)
out_files += obj.getFiles()
if self._backend: # Stage running under jobcontrol
for filename in out_files:
base = os.path.basename(filename)
# Move the output file to driver's directory:
dest_filename = os.path.join(self._driver_dir, base)
self.info('Moving %s to %s' % (filename, dest_filename))
shutil.move(filename, dest_filename)
# For formality, record it as an output file in the job record
self._backend.addOutputFile(base)
if not USING_JOB_SERVER and self._backend:
dest_filename = os.path.join(self._driver_dir, self._restart_file)
try:
shutil.copy(self._restart_file, dest_filename)
except shutil.SameFileError:
pass
else:
self.info('Copied %s to %s' %
(self._restart_file, dest_filename))
# Check to see if stage has set all outputs that were expected:
for position, out in self._expected_outputs.items():
if position not in self._output_objects:
if out['always']:
msg = "Stage %s did not set the output at position %i" % \
(self.stagename, position)
raise RuntimeError(msg)
self.info('Done.')
return self._output_objects
[docs] def addOutputFile(self, filename):
"""
Adds the specified file to the stage's job control record.
File must be specified as local (not absolute) path.
"""
if self._backend: # Stage running under jobcontrol (usually true)
self._backend.addOutputFile(filename)
[docs] def operate(self):
"""
OVERWRITE: Perform an operation on the input Objects.
use self.setOutput(position, obj) to set output objects
"""
[docs] def genFileName(self, extension=None, filenum=None, start=None, end=None):
"""
Generate a file name to be used by the stage.
Returns string::
"<full-stagename>-<filenum><extension>"
"<full-stagename>-<start>_<end><extension>"
"<full-stagename><extension>"
"<full-stagename>", etc.
Depending on given options.
"""
if not extension:
extension = ''
if filenum:
nstr = '-' + str(int(filenum)).zfill(3)
elif start and end:
nstr = '-%i_%i' % (start, end)
else:
nstr = ""
return "%s%s%s" % (self.stagename, nstr, extension)
[docs] def genOutputFileName(self,
position,
extension="",
filenum=None,
start=None,
end=None):
"""
Generate a file name to be used by the stage when writing
files for the output position <position>.
Returns strings::
"<full-varname>-<filenum><extension>"
"<full-varname>-<start>_<end><extension>"
"<full-varname><extension>"
"<full-varname>", etc.
Depending on given options.
"""
filename = self._output_names[position]
if filenum:
filename += '-%s' % str(int(filenum)).zfill(3)
elif start and end:
filename += '-%i_%i' % (start, end)
if extension:
filename += extension
return filename
[docs]def Restart(restart_file, ignored=None):
"""
Run this function in order to recover a saved Stage instance.
"""
if not os.path.isfile(restart_file):
msg = "stage.Restart(): File does not exist: %s" % restart_file
raise RuntimeError(msg)
with open(restart_file, "rb") as fh:
stageobj = pickle.load(fh)
return stageobj