Source code for schrodinger.pipeline.stage

"""
Base classes for Pipeline stages.

To create a stage instance::

    stageobj = YourStage(<stagename>)

    # Add keywords:

    stageobj['NUM_RINGS'] = 2
    stageobj['UNIQUEFIELD'] = 's_m_title'

    # Add input objects:
    # ligand_files is list of file names
    ligandsobj = pipeio.Structures(ligand_files)
    stageobj.setInput(1, 'INPUT1', ligandsobj)

    # Tell the stage where to save it's output:
    stageobj.setOutputName(1, 'OUTPUT')

    # Integers (1) are position numbers.
    # INPUT1 is the name of the input object

    # Run the stage:

    outputs = stageobj.run()

    # outputs is now a dictionary of output objects


Copyright Schrodinger, LLC. All rights reserved.

"""
import logging
import os
import pickle
import shutil
import sys
import time
from collections import UserDict

import schrodinger.job.queue as queue
import schrodinger.utils.log as log
from schrodinger import __version__
from schrodinger.application.inputconfig import InputConfig
from schrodinger.job import jobcontrol
from schrodinger.job import util
from schrodinger.utils import mmutil

logger = log.get_logger("schrodinger.pipeline.stage")
logger.setLevel(logging.INFO)
logger.propagate = False  # Don't duplicate messages upward
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)

# Check whether SCHRODINGER_PYTHON_DEBUG is set for debugging:
DEBUG = (log.get_environ_log_level() <= log.DEBUG)

if DEBUG:
    logger.setLevel(logging.DEBUG)

USING_JOB_SERVER = mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER)


[docs]def get_time(t1, t2=0): """ Returns a string of the time difference between t2 and t1 in human readable format. t2 and t1 are times, as returned by time.time() """ timeString = "" if t2 == 0: time = t1 else: time = int(t2) - int(t1) minutes = time // 60 hours = minutes // 60 seconds = time % 60 minutes = minutes % 60 if hours == 1: timeString = timeString + "1 hour " elif hours > 1: timeString = timeString + str(hours) + " hours " if minutes == 1: timeString = timeString + "1 minute " elif minutes > 1 or hours: timeString = timeString + str(minutes) + " minutes " if seconds == 1: timeString = timeString + "1 second" elif seconds > 1 or t2 != 0: timeString = timeString + str(seconds) + " seconds" return timeString
def _host_is_queue(hostname): """ Return True if specified host is a queue """ hostobj = jobcontrol.get_host(hostname) if hostobj: return hostobj.isQueue() else: return False
[docs]class Stage(UserDict):
[docs] def __init__(self, stagename, specs=None, allow_extra_keywords=False, cleanup=True, inpipeline=False, driver_dir=None): """ This is the Stage class. Derive your own class from it. :param stagename: full name for this stage (<jobname>-<stagename>) :param specs: ConfigObj specification for the supported keywords :param allow_extra_keywords: Whether to allow keywords that are not in the specification. :param cleanup: Whether to remove intermediate files :param inpipeline: Whether the state is running within a Python Pipeline. If the stage is manually created, do NOT set this flag. Python Pipeline will set it as needed. :param driver_dir: Directory in which the driver is running. """ self.specs = specs # If a string is specified, split it into a list: if isinstance(specs, str): self.specs = [ln for ln in specs.split('\n')] # Base this class in UserDict instead of InputConfig, # because InputConfig can not be pickled safely (Ev:87429): # # NOTE: As of version 4.6.0 of configobj (suite2011), InputConfig can now be pickled. # not removing this work-around for now (if it ain't broken, don't fix # it). self.allow_extra_keywords = allow_extra_keywords UserDict.__init__(self) self.stagename = stagename self._running_in_pipeline = inpipeline self._driver_dir = driver_dir self._expected_inputs = {} self._expected_outputs = {} self._required_products = set() self._input_objects = {} self._output_objects = {} self._input_names = {} # Variable name of input at each position self._output_names = {} # Variable name of output at each position self._restart_file = None self._idle_function = None # NOT USED? self._has_started = False self._has_completed = False # subjob options: Modify using setJobOptions() method self._subjob_hosts_list = [('localhost', 1)] # None means the number of subjobs is not specified self._target_njobs = None self._adjust_njobs = None # default self._subjob_max_retries = None # Number of restarts self._cleanup_requested = cleanup self._main_product = None # Used to tell which host list to use # For updateJobdj(): self._needed_cpus = None self._used_hosts = None self._last_message_time = None
[docs] def validateValues(self, preserve_errors=False): """ Validates the stored keywords. This is done by converting <self> to a ConfigObj instance, and calling validate() on it. The validated keywords are then updated back to <self>. This is done as part of Ev:87429 """ ic = InputConfig(dict(self), specs=self.specs) ic.validateValues(preserve_errors=preserve_errors) # Ev:101378 Exit with an error if unsupported keywords were specified: if not self.allow_extra_keywords: import configobj extra_keywords = [] for section, keyword in configobj.get_extra_values(ic): extra_keywords.append(keyword) if extra_keywords: raise RuntimeError( "ERROR: The following unsupported keywords were specified for this stage: %s" % ", ".join(extra_keywords)) self.update(dict(ic))
# FIXME: Implement getJobDJOptions() method which will return verbosity, # etc.
[docs] def getVerbosity(self): """ Return verbosity of thos stage (for JobDJ) """ global logger level = logger.level if level == log.WARNING: return 'quiet' elif level == log.DEBUG: return 'verbose' else: return 'normal'
[docs] def mainProduct(self): """ If a stage has a main product associated with it, the stage should overwrite this method with a method that returns the product string. For example, the LigPrepStage.mainProduct() will return "ligprep" Used by Pipeline. """ return self._main_product
[docs] def setMainProduct(self, product): """ Specify which product this stage is part of. Will determine which host the subjobs are run on. """ self._main_product = product
[docs] def addExpectedInput(self, position, type, required=True): """ A stage can accept one or more pipeio input objects. Use this method to specify the type of input object that is expected at each position. position - an integer starting at 1. type - structures/grids/etc. required - whether this input always needs to be specified """ position = int(position) # Make sure that it's an integer required = bool(required) # Make sure that it's a boolean self._expected_inputs[position] = {'type': type, 'required': required}
[docs] def addExpectedOutput(self, position, type, always=True): """ A stage can return one or more pipeio objects. Use this method to specify the type of object that will be returned and whether or not it will always be produced by the stage. position - an integer starting at 1. type - structures/grids/etc. always - whether this output is always produced """ self._expected_outputs[position] = {'type': type, 'always': always}
def __getitem__(self, keyname): """ Returns the value for specified keyword, or default value. Use as follows:: precision_value = stageobj['PRECISION'] Get values for keywords this way from stage's operate() to determine how to run. If the user did not specify a value for the keyword, default value is returned. Raises KeyError if a keyword is not supported by the stage. """ try: value = UserDict.__getitem__(self, keyname) except KeyError: raise # Files need to be treated specially (need run-time path): if keyname.endswith('_FILE') and value: # Return runtime path of the file value = self.getRuntimePath(value) return value
[docs] def getStageDirectory(self): """ Return the directory in which the stage is running """ return os.curdir
[docs] def requiredProduct(self, product): """ Specify a product that is required for this stage to run; optionally minimum version. Example: product="mmshare" """ self._required_products.add(product)
def _productPresent(self, product): """ Internal: return True if product installed""" hunt_result = util.hunt(product) if hunt_result: return True return False
[docs] def checkProducts(self): """ Raises RuntimeError if any of the required products are not installed or the version installed is less that minimum required version. It is possible to override this method. See ligprep.py for example. """ missing_products = [] for product in self._required_products: if not self._productPresent(product): missing_products.append(product) if missing_products: msg = "ERROR: The following products are not installed: " msg += ', '.join(missing_products) raise RuntimeError(msg)
[docs] def getRuntimePath(self, filename): """ Return the runtime-path of a file that user specified Prints an error and exits if file does not exist. """ self.debug("Determining runtime path of: %s" % filename) runtime_filename = jobcontrol.get_runtime_path(filename) if not os.path.exists(runtime_filename): msg = "ERROR: File does not exist: %s" % runtime_filename self.exit(msg) self.debug("Runtime path: %s" % runtime_filename) return runtime_filename
[docs] def requiredProductRuntime(self, product): """ Similar to requiredProduct() but can be used to specify required products at runtime. For example, ConvertStage doesn't know what products are required for conversion until runtime. Raises RuntimeError if product is not installed. """ if not self._productPresent(product): msg = "ERROR: The following product is not installed: %s" % product raise RuntimeError(msg)
[docs] def outputRequested(self, position): """ Returns True if the user requested optional output at <position> """ if position not in self._expected_outputs: msg = "ERROR: Output position not expected: %s" % position raise RuntimeError(msg) return position in self._output_names
[docs] def setOutput(self, position, obj): """ Use this method at the end of operate() to set the output. """ if position not in self._expected_outputs: msg = "ERROR: Output position not expected: %s" % position raise RuntimeError(msg) if position not in self._output_names: msg = "WARNING: User did not request output at position: %s" % position print(msg) obj.check() # Will kill script if any files are missing self._output_objects[position] = obj
[docs] def getOutput(self, position): """ Returns the output IO object of the stage at specified position. Use this method after running the stage to get its output objects """ return self._output_objects[position]
def _inputCheck(self, position, input): """ Makes sure that position is valid, that the specified input object is OK for the position. If Input is required for this position, input.check() is run. """ if position not in list(self._expected_inputs): raise RuntimeError("Error: Position %i not defined." % position) expected_type = self._expected_inputs[position]['type'] if input.type != expected_type: raise RuntimeError( "Error: Input Position %i Type '%s' does not match expected type '%s'!" % (position, input.type, expected_type)) if self._expected_inputs[position]['required']: input.check() def __getstate__(self): """ This method is called by the pickle module to return the state to be serialized by dump(). Since Job objects and callable functions can not be pickled, it removes them from the a copy of the instance __dict__ attribute. _backend needs to be removed so that mmjobbe_terminate() would not be called more than once when the job exits. """ state_dict = dict(self.__dict__) state_dict['_idle_function'] = None state_dict['_driver_jobid'] = None state_dict['_stage_job'] = None state_dict['_backend'] = None return state_dict
[docs] def dump(self): """ This method dumps all the variables of the Stage to a restart file. Run it every time an important step is performed. """ if self._restart_file: fh = open(self._restart_file, "wb") pickle.dump(self, fh, protocol=2) fh.close()
[docs] def initNonPersistent(self, pipeline): """ Gets called after the Stage joins pipeline. Meant to be used to initialize non-persistent context. """ pass
[docs] def checkFile(self, file, error="File does not exist:"): """ Raise exception if specified file does not exist. The message that is printed can be specified. """ if not os.path.exists(file): msg = '%s %s' % (error, file) self.warning(msg) raise RuntimeError(msg)
[docs] def checkFiles(self, files, error="File does not exist"): """ Raise expetion if any file does not exist. """ for f in files: if not os.path.exists(f): msg = '%s %s' % (error, f) self.warning(msg) raise RuntimeError(msg)
[docs] def lognoret(self, *args): """ Prints specified objects to the stage log file. No EOF return """ for a in args: print(a, end=' ') sys.stdout.flush()
[docs] def info(self, text): """ Print an info line to the log file """ logger.info(text) sys.stdout.flush() # update the log file
[docs] def debug(self, text): """ Print a debug line to the log file """ logger.debug(text) sys.stdout.flush() # update the log file
[docs] def warning(self, text): """ Print a warning line to the log file """ logger.warning(text) sys.stdout.flush() # update the log file
[docs] def error(self, text): """ Print an error line to the log file """ logger.error(text) sys.stdout.flush() # update the log file
[docs] def exit(self, text=""): """ Print an error line to the log file and exit with code 1 """ raise RuntimeError(text)
[docs] def reportParameters(self, fh=None): """ Print the value of each keyword for this stage to the stream specified as <fh>. Used by Pipeline """ if not fh: fh = sys.stdout fh.write(" Stage: %s\n" % self.stagename) for key in self.keys(): value = self[key] if value is not None and value != []: fh.write(" %s: %s\n" % (key, value)) fh.flush()
[docs] def checkParameters(self): """ OVERWRITE: Make sure that all parameters are valid. """
[docs] def hasStarted(self): """ Returns True if this stage has started. """ return self._has_started
[docs] def hasCompleted(self): """ Returns True if this stage's operate() exited successfully. """ return self._has_completed
[docs] def checkInputs(self): """ OVERWRITE: Return False if something is wrong with the input files or the parameter, otherwise return True. """ return True
[docs] def getName(self): """ Return stagename (jobname of the stage) """ return self.stagename
[docs] def getInputNames(self): """ Return a dictionary of variable name of the inputs at each position. Key:position, value:name """ return self._input_names
[docs] def setInput(self, position, name=None, obj=None): """ Specify an input to use for this stage. position - input specified is for this position name - Variable name of this IO object obj - the IO object This method is called by Pipeline. """ if name: self._input_names[position] = name if obj: self._inputCheck(position, obj) self._input_objects[position] = obj
[docs] def getInput(self, position): """ Use in operate() to get the input object for specified position. Returns None if invalid position is specified. """ return self._input_objects.get(position)
[docs] def iterInputs(self): """ Iterate through input objects: (position, obj) """ return self._input_objects.items()
[docs] def setOutputName(self, position, varname): """ Is called by Pipeline when starting the stage. Tell the stage what name to save each output under. """ # if not position in self._expected_outputs.keys(): # raise KeyError("Stage %s does not expect output #%i!" % # (self.stagename, position) ) self._output_names[position] = varname
[docs] def getOutputNames(self): """ Return a list of output names for each position (dict) """ return self._output_names
[docs] def getOutputName(self, position): """ Return the output name for specified position """ if position in self._output_names: return self._output_names[position] else: # Output at this position was not requested return 'OUTPUT%i' % position
[docs] def setJobOptions(self, subjob_hosts=None, njobs=None, adjust=None, force=None, max_retries=None, cleanup=None): """ Tell this stage how to run the subjobs :param subjob_hosts: list of hosts to run subjobs on :param njobs: number of subjobs to generate. None means determine automatically. :param adjust: whether to adjust njobs such that job size is within limits :param force: whether to continue with job if subjobs fail :param max_retries: number of times to attempt to restart a subjob If not specified, use SCHRODINGER_MAX_RETRIES or 2. :param cleanup: whether to delete intermediate files """ if subjob_hosts is not None: if not isinstance(subjob_hosts, type([])): print('INVALID HOSTS:', subjob_hosts) raise RuntimeError( "Stage.setJobOption() hosts must be given as a list") self._subjob_hosts_list = subjob_hosts[:] else: # Run subjobs on localhost: self._subjob_hosts_list = [('localhost', 1)] if njobs is not None: self._target_njobs = int(njobs) if adjust is not None: self._adjust_njobs = bool(adjust) if force is not None: self._force_jobs = bool(force) if max_retries is not None: self._subjob_max_retries = int(max_retries) if cleanup is not None: self._cleanup_requested = bool(cleanup)
[docs] def getHostList(self): """ Returns a list of hosts to run the subjobs on. localhost:1 may be in the list as well. Ideally, pass the output to JobDJ. Format: [ (host1,ncpus), (host2,ncpus) ] Pass this value to JobDJ. """ return self._subjob_hosts_list
[docs] def getHostStr(self): """ Just like getHostList() but instead of returning a list, returns a host string to be passed to the -HOST argument. """ return jobcontrol.host_list_to_str(self.getHostList())
[docs] def areSubjobsAndStageOnSameHost(self): """ Return True if subjobs are running on the same host as the driver and the stages. This is always the case with JOB_SERVER but not with classic Job Control. """ if self.getHostList()[0][0] == "localhost": # If subjobs are running on localhost, so is the driver return True if jobcontrol.get_backend_host_list() is None: # Driver is running on localhost return False # Driver and subjobs are running on remote host. return True
[docs] def getNJobs(self): """ Returns the requested target number of subjobs, and whether or not to adjust that number if it is unreasonable. If -NJOBS was not specified, the # of CPUs or 10 is returned (whichever is smaller). Used by Glide DockingStage and _adjustNJobs() """ # If specific number of subjobs was specified, always use that: if self._target_njobs: if self._adjust_njobs: return (self._target_njobs, True) else: return (self._target_njobs, False) all_hosts_had_cpus = True total_cpus = 0 for (hostname, ncpus) in self._subjob_hosts_list: if ncpus is None: all_hosts_had_cpus = False if _host_is_queue(hostname): # "unlimited" CPUs - queue host. Use 10 as an arbitrary number ncpus = 10 else: ncpus = 1 # Non-queue host specified without CPU total_cpus += ncpus # Ev:125618 If a specific number of CPUs was specified, use that: if all_hosts_had_cpus: if self._adjust_njobs: return (total_cpus, True) else: # do not adjust by default - use #cpus return (total_cpus, False) # If we got here then neither # of subjobs nor # of CPUs was specified # (adjust the number of subjobs, starting with 10) return (10, True)
def _calcStsPerJob(self, total_mol, njobs): """ Determine the number of structure that should go into each subjob """ # FIXME Use math.ceil() for simplicity? sts_per_file = int((total_mol / njobs) + 0.6) # Prevent creation of extra tiny subjob at end: while sts_per_file * njobs < total_mol: sts_per_file += 1 return sts_per_file def _adjustNJobs(self, total_mol, min_job_size, max_job_size): """ Adjusts the specified number of jobs so that the job size ranges between specificed min and max. Also calculates the number of ligands that will be in each subjob. """ # Do not separate tiny jobs: if total_mol <= min_job_size: return (total_mol, 1) max_njobs = 999 max_ligands_per_file = 100000 # Roughly 1.5 GB # Limit the number of ligands per subjob: if max_job_size > max_ligands_per_file: max_job_size = max_ligands_per_file (njobs, adjust) = self.getNJobs() # Determine the initial njobs value: if self._target_njobs: # -NJOBS specified up_incremental = 1 else: # No -NJOBS specified total_cpus = self.getNCpus() # Ev:56080 split into multiples of ncpus up_incremental = total_cpus # If -NJOBS is not specified, then the number of subjobs will start with # of cpus, # and will be adjusted UP in #cpu increments, and adjusted DOWN by # 1. # Calculate sts_per_file for initial njobs value: sts_per_file = self._calcStsPerJob(total_mol, njobs) # Make sure size of job is more than min_job_size: # Only run when -NJOBS specified was too big: while sts_per_file < min_job_size: njobs -= 1 sts_per_file = self._calcStsPerJob(total_mol, njobs) if njobs == 1: break # Can't have fewer than 1 number of jobs # Make sure size of job is less than max_job_size: while sts_per_file > max_job_size: njobs += up_incremental sts_per_file = self._calcStsPerJob(total_mol, njobs) if njobs == max_njobs: break # Can't have more than max_njobs number of jobs return (sts_per_file, njobs)
[docs] def getAdjustedNJobs(self, total_mol, min_job_size, max_job_size): """ Returns the desired number of subjobs, and adjusts it for the the specified min & max job sizes if the user specified ADJUST option. If the number of desired jobs was specified by the user, the number of available cpus is used or 10, whichever is smaller. Specify the number of input ligands and the smallest and largest desired job sizes (Generally job lengths of 1 minute & 24 hours). """ (njobs, adjust) = self.getNJobs() if not adjust: # No adjustment, create -NJOBS number of jobs (or CPUs): n_st_per_file = self._calcStsPerJob(total_mol, njobs) return (n_st_per_file, njobs, False) (n_st_per_file, njobs) = self._adjustNJobs(total_mol, min_job_size, max_job_size) adjusted = (njobs != self._target_njobs) return (n_st_per_file, njobs, adjusted)
[docs] def JobDJOptions(self): """ Returns a dictionary of options to pass to JobDJ: hosts, max_retries, default_max_retries, verbosity """ kw = {} if self._driver_jobid: # If running under a driver job, use processor-check-out mechanism. kw['hosts'] = [] else: # If running only one stage, use one CPU: kw['hosts'] = [('localhost', 1)] kw['max_retries'] = self.getMaxRetries() kw['max_failures'] = queue.NOLIMIT # Do not exit when subjobs fail kw['verbosity'] = self.getVerbosity() return kw
[docs] def setJobDJOptions(self, jobdj): """ Use this method to adjust the specified queue.JobDJ instance to the VSW settings. """ jobdj.max_retries = self.getMaxRetries() jobdj.max_failures = queue.NOLIMIT jobdj._verbosity = self.getVerbosity() if self._driver_jobid: # If running under a driver job, use processor-check-out mechanism. host_list = [] else: # If running only one stage, use one CPU: host_list = [('localhost', 1)] jobdj.setHostList(host_list) jobdj._ignore_host_resource_checking = True
[docs] def getJobDJ(self, **kwargs): """ Returns a pre-set JobDJ instance for the stage to use. It already has it's hosts, max_retries, max_failures, default_max_retries, and verbosity set. """ useargs = self.JobDJOptions() # Stage may overwrite: useargs.update(kwargs) j = queue.JobDJ(**useargs) return j
[docs] def getMaxRetries(self): """ Return the number of max restarts to use. If -max_retries is specified, returns that value; otherwise if SCHRODINGER_MAX_RETRIES is defined, returns that value; otherwise returns default of 2. Pass this value to JobDJ. """ # User did not specify -max_retires: if self._subjob_max_retries is None: env_max_retries = os.getenv('SCHRODINGER_MAX_RETRIES') if env_max_retries: return int(env_max_retries) else: # env var not defined, use 2. return 2 else: return self._subjob_max_retries
[docs] def getNCpus(self): """ Returns the total number of processors specified in the host string. For queued hosts with no CPU# specification, 10 is added. """ total_cpus = 0 for (hostname, ncpus) in self._subjob_hosts_list: if ncpus is None: if _host_is_queue(hostname): # "unlimited" CPUs - queue host. Use 10 as an arbitrary number ncpus = 10 else: ncpus = 1 # Non-queue host specified without CPU total_cpus += ncpus return total_cpus
[docs] def getCleanupRequested(self): """ Stages should clean up after themselves if this returns True """ return self._cleanup_requested
def _readMessageFromDriver(self): """ Read the LAST message that the pipeline sent to this stage. This message will tell this stage how many processors to use. """ if not self._driver_jobid: return None # No messages to read last_message = None while True: message = self._backend.nextMessage() if not message: return last_message last_message = message # Return the last message (or None): return message
[docs] def updateJobdj(self, jobdj): """ Gets called periodically in order to update JobDJ's hosts. Will ask Pipeline for CPUS when needed, and will tell Pipeline when they are no longer needed. """ if not self._driver_jobid: # No messages to read if not running under a Pipeline driver. return # Analyze queue.JobDJ: # FIXME these attributes are private: num_active = len(jobdj._running) num_completed = len(jobdj._finished) num_waiting = jobdj.total_added - num_active - num_completed old_needed_cpus = self._needed_cpus old_hosts = self._used_hosts self._used_hosts = jobdj.getActiveProcCounts() self._needed_cpus = num_waiting + num_active # Send a message to driver if the number of needed cpus has # changed (number of active+waiting jobs has decreased): # OR we now need LESS cpus: send_message = False if self._used_hosts != old_hosts: send_message = True elif self._needed_cpus != old_needed_cpus: send_message = True if send_message: # Send a message to the driver stating 2 things: # 1. The hosts that we are currently using AND # 2. The number of cpus that we need (total) used_hosts_list = [ (host, ncpus) for host, ncpus in self._used_hosts.items() ] used_hosts_str = jobcontrol.host_list_to_str(used_hosts_list) jobid = self._stage_job.JobId msg = "pipeline %s %i %s" % (jobid, self._needed_cpus, used_hosts_str) self.debug('DEBUG SENDING MESSAGE: "%s"' % msg) try: self._backend.sendMessageToParent(msg) except Exception as err: print('FAILED TO SEND MESSAGE TO DRIVER') print('MESSAGE: "%s"' % msg) print('EXCEPTION:', str(err)) sys.stdout.flush() # Update the log file self._last_message_time = time.time() msg = 'Requested %s total cpu(s); and said that we are using: %s' % ( self._needed_cpus, used_hosts_str) self.debug(msg) message = self._readMessageFromDriver() if message: s = message.split(None, 2) # Make 2 cuts if len(s) == 2: # No host list specified hosts_to_use = [] else: hosts_to_use = jobcontrol.host_str_to_list(s[2]) self.debug(' Have been told to use hosts: %s' % hosts_to_use) jobdj.setHostList(hosts_to_use) self._last_message_time = None sys.stdout.flush()
[docs] def runJobDJ(self, jobdj): def job_status_change(job): self.updateJobdj(jobdj) if job is not None: # Status changed: self.dump() def periodic_callback(): self.updateJobdj(jobdj) jobdj.run(status_change_callback=job_status_change, callback_interval=1, periodic_callback=periodic_callback)
[docs] def run(self, idle_function=None, restart_file=None, verbosity=None, logfh=None): """ Run the stage. idle_function - function to call when idle restart_file - file to periodically dump this instance to verbosity - there are three verbosity levels: "quiet", "normal", and "verbose" "quiet" - only warnings and errors are printed "normal" - stage progress is printed - default "verbose" - additional debugging info is printed logfh - where to send the loggin output """ global logger global handler if verbosity is not None: if verbosity == 'quiet': logger.setLevel(log.WARNING) elif verbosity == 'normal': logger.setLevel(log.INFO) elif verbosity == 'verbose': logger.setLevel(log.DEBUG) if logfh is not None: # Setup logger to log to logfh: logger.removeHandler(handler) handler = logging.StreamHandler(logfh) handler.setFormatter(logging.Formatter("%(message)s")) logger.addHandler(handler) self.debug('SCHRODINGER_HOSTS:%s' % os.environ.get('SCHRODINGER_HOSTS')) if restart_file: self._restart_file = restart_file self.debug("Will periodically dump this stage to: %s" % self._restart_file) # Set jobHost and masterJobID to appropriate values: self._backend = jobcontrol.get_backend() # Get the job object for the driver job (for messaging): self._driver_jobid = None self._stage_job = None if self._backend: # stage under jobcontrol (unit tests don't) self._stage_job = self._backend.getJob() if self._running_in_pipeline and hasattr(self._stage_job, 'ParentJobId'): # If running within Python Pipeline AND the driver was NOT run with -NOJOBID # FIXME perhaps a better check is needed? # NOTE: We do NOT have access to the driver job record! # See Ev:130818 and Ev:132483 # Allows implementation of Ev:105812 (running stage outside of Pipeline) # Listen to Pipeline messages only when running in a Python # Pipeline workflow: self._driver_jobid = self._stage_job.ParentJobId self.debug('Will listen to "pipeline" messages from the driver') self._backend.addMessageName('pipeline') totalTimeStr = 'NA' self._idle_function = idle_function if not self._has_started: if not self._input_objects: if self._expected_inputs: raise RuntimeError("NO INPUT OBJECTS FOUND!") if not self._output_names: if self._expected_outputs: # FIXME allow optional outputs raise RuntimeError("NO OUTPUT DESTINATIONS FOUND!") self.info("\nRunning stage: %s" % self.stagename) sys.stdout.flush() else: # Restarting... if self.hasCompleted(): self.info('STAGE %s IS COMPLETE' % self.stagename) else: self.info('RESTARTING STAGE %s' % self.stagename) # Ev:98326 This will force the stage to request the CPUs again: self._needed_cpus = None if not self.hasCompleted(): # Starting from beginning or restarting and not complete: self._has_started = True startTime = time.time() self.debug("Running in debug mode") self.info("SCHRODINGER: %s" % os.environ.get('SCHRODINGER')) self.info("PYTHONPATH: %s" % os.environ.get('PYTHONPATH')) if self._backend: # Stage running under jobcontrol (not unit test) # Print jobid of the stage: self.info("Job ID: %s" % self._backend.job_id) self.info("Time: %s" % time.asctime()) # Convert keywords to desired format and # set unspecified keywords to defaults: if self.specs: try: self.validateValues(preserve_errors=False) except Exception: # self.exit("Failed to validate input keywords") raise self.checkProducts() # Make sure products are installed self.info('Stage started successfully\n') # Ev:99539 self.info("Python version: %s" % __version__) self.operate() # Run the stage totalTimeStr = get_time(startTime, time.time()) if not self._output_objects: if self._expected_outputs: # FIXME and not optional msg = 'Use setOutput() at the end of Stage.operate()' print(msg) raise RuntimeError(msg) self._has_completed = True self.dump() self.info("\nSTAGE COMPLETED. Elapsed time: " + totalTimeStr + "\n") self.dump() # Otherwise the object would not be pickable. self._idle_function = None # Stage has completed. # Tell jobcontrol to copy the output files: out_files = [] for position, obj in self._output_objects.items(): obj.check() if position in self._output_names: # User requested this output varname = ' (%s)' % self._output_names[position] else: varname = '' msg = " Output #%i%s " % (position, varname) msg += str(obj) self.info(msg) out_files += obj.getFiles() if self._backend: # Stage running under jobcontrol for filename in out_files: base = os.path.basename(filename) # Move the output file to driver's directory: dest_filename = os.path.join(self._driver_dir, base) self.info('Moving %s to %s' % (filename, dest_filename)) shutil.move(filename, dest_filename) # For formality, record it as an output file in the job record self._backend.addOutputFile(base) if not USING_JOB_SERVER and self._backend: dest_filename = os.path.join(self._driver_dir, self._restart_file) try: shutil.copy(self._restart_file, dest_filename) except shutil.SameFileError: pass else: self.info('Copied %s to %s' % (self._restart_file, dest_filename)) # Check to see if stage has set all outputs that were expected: for position, out in self._expected_outputs.items(): if position not in self._output_objects: if out['always']: msg = "Stage %s did not set the output at position %i" % \ (self.stagename, position) raise RuntimeError(msg) self.info('Done.') return self._output_objects
[docs] def addOutputFile(self, filename): """ Adds the specified file to the stage's job control record. File must be specified as local (not absolute) path. """ if self._backend: # Stage running under jobcontrol (usually true) self._backend.addOutputFile(filename)
[docs] def operate(self): """ OVERWRITE: Perform an operation on the input Objects. use self.setOutput(position, obj) to set output objects """
[docs] def genFileName(self, extension=None, filenum=None, start=None, end=None): """ Generate a file name to be used by the stage. Returns string:: "<full-stagename>-<filenum><extension>" "<full-stagename>-<start>_<end><extension>" "<full-stagename><extension>" "<full-stagename>", etc. Depending on given options. """ if not extension: extension = '' if filenum: nstr = '-' + str(int(filenum)).zfill(3) elif start and end: nstr = '-%i_%i' % (start, end) else: nstr = "" return "%s%s%s" % (self.stagename, nstr, extension)
[docs] def genOutputFileName(self, position, extension="", filenum=None, start=None, end=None): """ Generate a file name to be used by the stage when writing files for the output position <position>. Returns strings:: "<full-varname>-<filenum><extension>" "<full-varname>-<start>_<end><extension>" "<full-varname><extension>" "<full-varname>", etc. Depending on given options. """ filename = self._output_names[position] if filenum: filename += '-%s' % str(int(filenum)).zfill(3) elif start and end: filename += '-%i_%i' % (start, end) if extension: filename += extension return filename
[docs]def Restart(restart_file, ignored=None): """ Run this function in order to recover a saved Stage instance. """ if not os.path.isfile(restart_file): msg = "stage.Restart(): File does not exist: %s" % restart_file raise RuntimeError(msg) with open(restart_file, "rb") as fh: stageobj = pickle.load(fh) return stageobj