Source code for schrodinger.pipeline.pipeline

"""
Classes for running pipelines.

The main class is called Pipeline. This class parses the input file, creates
appropriate stages, and runs them in their own subdirectories.

The StageJob class represents a pipeline job linked to a specific stage.

The IO (In/out object) classes (defined in pipeio.py) represent information
that is from one stage to another, such as a list of files. They are also
called Variables.


Input Object Syntax
===================

The Pipeline input file is used to specify which stages to run,
how to run them (parameters), what to use for input, and where
to send the output. An example input file looks like::

    SET MY_INPUT
        VARCLASS Structures
        FILE /home/adzhigir/vsw_testing/20confs.mae

The `SET` line value (`MY_INPUT`) specifies the name of the IO object.
The `VARCLASS` value (`Structures`) specifies the PipeIO class to create.
Pipeline uses VARCLASS to determine which variable to create. Pipeline will
search schrodinger.pipeline.pipeio module for the class name specified of
this line. If it is not found there, it assumes a custom class is specified
as absolute path. (In this case, make sure the custom module is in your
`PYTHONPATH`.)

All lines following `VARCLASS` are used to define what information to put into
this variable, in this case it is a Maestro file (`20confs.mae`).


Stage Syntax
============

An example stage file looks like::

    STAGE MY_STAGE
        STAGECLASS  macromodel.ConfSearchStage
        INPUT       MY_INPUT
        OUTPUT      MY_OUTPUT
        FFLD        MMFFS

The `STAGE` line value (`MY_STAGE`) specifies the name of the stage.  The
`STAGECLASS` keyword specifies `<module>.<class name>` that defines the
stage.  Pipeline uses `STAGECLASS` to determine which stage to create.
Pipeline will search schrodinger.pipeline.stages namespace as well.  Please
make sure the module is in your `PYTHONPATH`.

See `schrodinger.pipeline.stages.combine` for an example on how to
write a stage module.

Input variables for the stage are specified via `INPUT` keywords, and outputs
via `OUTPUT` keywords. The rest of the keywords tell the stage how to run.

If you wish to run the Pipeline without using the pipeline startup
machinery::

    p = pipeline.Pipeline([options])
    p.readFile(<input file>)
    try:
        p.run()
    except RuntimeError:
        ...


If restartability is important, specify the `restart_file` when
constructing the Pipeline object.

To restart Pipeline, do::

    p = pipeline.Restart(restart_file [, new options]),
    try:
        p.run()
    except RuntimeError:
        ...


where `restart_file` is the same file that you specified to this
constructor when the initial instance was created.

Copyright Schrodinger, LLC. All rights reserved.

"""

import os
import pickle
import shutil
import sys
import time
import weakref  # For weak references to prevent cyclic references

import configobj

import schrodinger.pipeline.stage as stage_module
import schrodinger.utils.fileutils as fileutils
from schrodinger.application import inputconfig
from schrodinger.infra import mm
from schrodinger.infra import mmjob
from schrodinger.job import jobcontrol
from schrodinger.job import launcher
from schrodinger.utils import log
from schrodinger.utils import mmutil

# Check whether SCHRODINGER_PYTHON_DEBUG is set for debugging:
DEBUG = (log.get_environ_log_level() <= log.DEBUG)

# JOB_SERVER does not support -LOCAL option, so stages will not be restartable
USING_JOB_SERVER = mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER)

# Statuses of stage jobs:
WAITING = 'WAITING'  # Not run yet
RUNNING = 'RUNNING'  # Running right now
COMPLETED = 'COMPLETED'  # Completed successfully
FAILED = 'FAILED'  # Failed
RESTARTING = 'RESTARTING'  # Failed eralier and marked for restart

# Dictionary of stages that were updated from the *.dump files.
# key: stagename, value: stage object.
# This variable is global because we don't want this info to go into
# the Pipeline *.restart file, as it is only temporary
updated_from_dump = {}

# For compatability with old names/locations for stages:
# (keys are old names, values are new names)
old_stage_classes = {
    "merge.MergeStage": "glide.MergeStage",  # r2006
    "ligfilter.LigFilterStage": "filtering.LigFilterStage",  # r2006
    "phase.DBManage": "phase.DBManageStage",  # only some old r2008 builds
    # only some old r2008 builds
    "phase.DBConfSites": "phase.DBConfSitesStage",
    # Moved for r2010 alpha, Ev:95682
    "chargefilter.ChargeFilterStage": "filtering.ChargeFilterStage",
    "mmgbsa.MMGBSAStage": "prime.MMGBSAStage",  # Ev:95727, r2011 alpha
    "mopac.MopacStage": "semiemp.SemiEmpStage",  # Ev:112709 r2012 alpha
    "gencodes.GenCodesStage": "gencodes.Recombine",  # Ev:83703
}

# Pipeline can be told to print the loggin infomation to any pipe,
# but the default is to print it to stdout, which corresponds to the
# main log file when running under jobcontrol:
global_logfh = sys.stdout


[docs]def log(text): """ Prints specified text to the log pipe; adds a return at the end """ global_logfh.write(text + '\n') global_logfh.flush()
[docs]def logn(text): """ Print the specified text to the log pipe with no newline. This is especially useful when printing progress periods. """ global_logfh.write(text) global_logfh.flush()
[docs]def add_host_lists(list1, list2): """ Append hosts in list2 to list1. Example:: list1 = a:5,b:10 list2 = a:2,c:12 output = a:7,b:10,c:12 The order of hosts is retained (first list is given priority). """ out_list = [] list1_hostnames = [] for host, cpus in list1: list1_hostnames.append(host) for newhost, newcpus in list2: if host == newhost: cpus += newcpus out_list.append((host, cpus)) for host, cpus in list2: if host not in list1_hostnames: out_list.append((host, cpus)) return out_list
[docs]def subtract_host_lists(list1, dict2): """ Return available (not used) hosts. This function subtracts the host dict `dict2` from the host dict `list1`. :type list1: dict :param list1: All available hosts (specified by user), with hostname as key and cpu count as value. :type dict2: dict :param dict2: All used hosts (used by stages) """ avail_list = [] for host, cpus in list1: used_cpus = dict2.get(host, 0) if used_cpus == 0: avail_list.append((host, cpus)) elif cpus is None: # Unlimited avail_list.append((host, None)) else: avail_cpus = cpus - used_cpus if avail_cpus: avail_list.append((host, avail_cpus)) return avail_list
def _host_is_queue(hostname): """ Return True if `hostname` is a queue. """ hostobj = jobcontrol.get_host(hostname) if hostobj: return hostobj.isQueue() else: return False _job_objects = {} # Key: jobid, Value: Job object def _get_job(jobid): """ Return the job object for job with `jobid`. """ if jobid not in _job_objects: jobobj = jobcontrol.Job(jobid) _job_objects[jobid] = jobobj return _job_objects[jobid]
[docs]def importName(modulename, name): """ Import a named object from a module in the context of this function. For example, if you would like to create an instance of the Foo class from the bar.py module:: foo_class = importName("bar", "Foo") foo_instance = foo_class() :raises ImportError: Raised when the object can not be imported. """ # Described in Python Cookbook (2002, section 15.3) try: module = __import__(modulename, globals(), locals(), [name]) except ImportError: msg = "Could not import module: %s" % modulename raise ImportError(msg) try: obj = vars(module)[name] except KeyError: msg = "Class %s is not defined in module %s!" % (name, modulename) raise ImportError(msg) return obj
[docs]class StageJob: """ A "Job" that is used by Pipeline to run a Stage. Each StageJob has a Stage object associated with it. This object is periodically dumped to a file in order to support restartability. The process is called "dumping" and the file is the dump file. When Pipeline is restarted, each stage object is recovered from the associated dump file to get the latest state. """
[docs] def __init__(self, stageobj, pipeline, module): """ :param pipeline: Reference to the Pipeline object. :type stageobj: Stage :param stageobj: Stage to run. :param module: The module where the above stage is defined. """ self.stageobj = stageobj # Stage object that is run by this stage job self.pipeline = pipeline self.module = module self.jobid = "" # JobID of this stage job self.status = WAITING # Can be: WAITING, RUNNING, DONE, or FAILED # Hosts that are used by this stage AT THIS MOMENT self.checked_out_hosts = [] self.num_needed_cpus = 0 # Number of CPUS needed by the stage self.num_given_cpus = 0 # Number of CPUS given to the stage to use self.host_pool = None # The reason why the stage failed (e.g. "missing") self.died_action = None
# Can't set the host_pool here, since self.pipeline.host_dict is not # set yet def _setHostPool(self): """ Set the self.host_pool variable based on the stage's product. """ # This method was created due to Ev:105135. The self.pipeline.host_dict # needs to be read AFTER it is set, so can't be done in the constructor # of this class, since pipeline.setOptions() is called afterwards. if self.host_pool is not None: return # Can be equal to None as well: product = self.stageobj.mainProduct() # User specified special host list for this product if product in self.pipeline.host_dict: self.host_pool = product else: # Use the general host list self.host_pool = 'general'
[docs] def setUsedHosts(self, new_host_list): diff = (new_host_list == self.checked_out_hosts) self.checked_out_hosts = new_host_list self.num_given_cpus = 0 for host, cpus in self.checked_out_hosts: self.num_given_cpus += cpus return diff
[docs] def getUnusedHostList(self): # List of all AVAILABLE hosts for this product: self._setHostPool() hostlist = self.pipeline.host_dict[self.host_pool] # Dict of all hosts used by all stages for this product used_hosts_dict = self.pipeline.getUsedHosts(self.host_pool) # List of hosts/cpus that are not used (available): unused_hosts_list = subtract_host_lists(hostlist, used_hosts_dict) return unused_hosts_list
[docs] def updateStageCpus(self): """ Based on current host usage and number of needed cpus, determine which hosts this stage should use and send them to it in a message. """ requested_cpus = self.num_needed_cpus - self.num_given_cpus # List of new hosts for the stage to use: additional_hosts_to_use = [] for hostname, available_cpus in self.getUnusedHostList(): sys.stdout.flush() if available_cpus is None or (available_cpus >= requested_cpus): # Unlimited CPUS OR there are enough processors available additional_hosts_to_use.append((hostname, requested_cpus)) requested_cpus = 0 break # Satisfied the request elif available_cpus: # At least some are available additional_hosts_to_use.append((hostname, available_cpus)) requested_cpus -= available_cpus if DEBUG: print(' Addnl CPUs requested: %i, Addnl hosts granted: %s' % (self.num_needed_cpus, additional_hosts_to_use)) # Append additional cpus to the checked_out_hosts list: new_host_list = add_host_lists(self.checked_out_hosts, additional_hosts_to_use) self.setUsedHosts(new_host_list) # Send message to stage updating its host list: self.sendHostsToUse()
[docs] def sendHostsToUse(self): """ Send a message to the stage job telling it how many CPUS to use. Gets called periodically in case messages don't go through. """ name = self.stageobj.stagename if DEBUG: print('Telling stage %s to use hosts: %s' % (name, self.checked_out_hosts)) use_hosts_str = jobcontrol.host_list_to_str(self.checked_out_hosts) msg = 'pipeline use %s' % use_hosts_str if DEBUG: print('DEBUG SENDING MESSAGE: "%s"' % msg) mmjob.mmjobbe_send(self.jobid, msg) sys.stdout.flush()
[docs] def restart(self, action): """ Mark this job to be restarted by the Pipeline. """ self.status = RESTARTING self.printAction(action)
[docs] def finish(self): """ Sets the pipe's stage object to the final finished stage object from the dump file, and parses all of the outputs. """ self.status = COMPLETED self.printAction('completed') sjname = self.stageobj.stagename output_dump_file = sjname + ".out" sys.stdout.flush() if not os.path.isfile(output_dump_file): print("ERROR: No output from stage!") sys.stdout.flush() self.died('failed') # quits the pipeline. # The stage executable wrote the output objects to a file. # Try to read that file: with open(output_dump_file, "rb") as fh: outputs = pickle.load(fh) sys.stdout.flush() output_objects = {} for position, obj in outputs.items(): obj.check() outnames = self.stageobj.getOutputNames() if position not in outnames: # User did not request this output continue varname = outnames[position] sys.stdout.flush() obj.name = varname output_objects[varname] = obj if obj.isFilled(): count = obj.getCount() if count is not None: logn(" Output: %s (%s)" % (varname, count)) else: logn(" Output: %s" % varname) else: logn(" Output: %s (empty)" % varname) # FIXME: Add ability to return the number of structures: if obj.type == 'ligands': # New is "structures" logn(" Counting.") count = obj.count() log(" Number of structures: %i" % count) else: log('') sys.stdout.flush() for varname, var_obj in output_objects.items(): self.pipeline._objects[varname] = var_obj self.pipeline.intermediate_varnames.append(varname)
[docs] def readyToRun(self, objects): """ Return True if this StageJob has all inputs that are required for it to start. """ if self.status == RESTARTING: sys.stdout.flush() return True elif self.status == WAITING: inputs_ready = True for varname in self.stageobj.getInputNames().values(): if varname not in objects: inputs_ready = False break if not inputs_ready: return False return True else: # Is running or has finished: return False
[docs] def printAction(self, action): """ Call the Pipeline's printAction method. """ self.pipeline.printAction(self.stageobj.stagename, self.jobid, action)
[docs] def died(self, action): """ Mark this stage as failed. The "action" gets saved in the "died_action" attribute, and will be printed out at the end of the workflow. This gets called every time a StageJob dies by raising a RuntimeError exception. """ self.printAction(action) self.died_action = action # When the Pipeline will be restarted, this stage will be restarted # too: self.status = FAILED # Throw the CPUs used by this stage back into the common pool: self.num_needed_cpus = 0 # Number of CPUS needed by the stage self.setUsedHosts([]) self.pipeline.distributeCpus() self.pipeline.dump() # Save the state of the pipeline
[docs] def printFailureMessage(self): """ Print the failure status of the stage and the path to the log file. """ print("Stage %s %s" % (self.stageobj.stagename, self.died_action)) log_file = self.stageobj.stagename + '.log' if os.path.exists(log_file): print(" SEE LOG FILE:", os.path.abspath(log_file)) else: print(" No log file to examine; missing file: %s" % log_file) sys.stdout.flush()
[docs] def updateFromDump(self, quiet=False): """ Update this stage of the pipeline to the latest state from the dump file. """ sjname = self.stageobj.stagename stage_dump_file = sjname + ".dump" if os.path.isfile(stage_dump_file): try: latest_stageobj = stage_module.Restart(stage_dump_file) except Exception as err: if DEBUG: print('EXCEPTION FROM stage.Restart():', err) msg = 'Pipeline: WARNING: Failed to update %s from dump file! Will restart from beginning' % sjname log(msg) else: # update successful updated_from_dump[sjname] = latest_stageobj if not quiet: msg = 'Pipeline: Updated stage %s from dump file successfully.' % sjname log(msg) else: msg = 'Pipeline: WARNING: No dump file exists for stage %s! Will restart from beginning' % sjname log(msg)
def _writeStageExecutable(self, destination): """ Write a Stage executable python script that will be submitted under jobcontrol, and save it to `destination`. :type destination: str :param destination: The filename to write the stage executable python script to. """ stage_class_module = self.module ####################################################################### string = """ import os import pickle import sys script = sys.argv[0] try: import {0} except ImportError: raise ImportError(script + ': Could not import module <{0}>.') stagename = os.path.splitext(script)[0] restart_file = stagename + '.dump' try: # Load the stage dump file: with open(restart_file, "rb") as fh: stage = pickle.load(fh) except Exception: raise RuntimeError(script + ': Could not load stage from dump file') ######### MODIFY THIS SO THAT THE OPTIONS ARE UPGRADED EVEN WHEN RESTARTING ### if not stage.hasStarted(): # If NOT restarting print('Stage', stage.stagename, 'initializing...') for position, obj in stage.iterInputs(): obj.check() # Check to see if the object is valid else: # Restarting print('Stage', stage.stagename, 'preparing to restart...') # Periodically dump this instance to the dump file: # Run the instance: try: outputs = stage.run(restart_file=restart_file) except RuntimeError as err: print(err) # Print the error without traceback sys.exit(1) # Exit this script # Dump the outputs to a dump file: try: with open(stagename + '.out', 'wb') as fh: pickle.dump(outputs, fh, protocol=2) except Exception: raise RuntimeError(script + ': Could not write the output file') """.format(stage_class_module) with open(destination, "w") as fh: fh.write(string)
[docs]class Pipeline: """ A controller responsible for running the stages in the correct order. Pipeline parses the input file, creates instances of all IO objects, stage objects, and stage job objects, submits the stages in the appropriate directories, and waits for them to finish. Once a stage finishes, it starts any stages that depend on its output. When all stages are complete, it presents the user with the USER OUTPUT objects - IO output objects that are to be returned by the pipeline. """
[docs] def __init__( self, jobname="pipeline", # Jobname of the pipeline prog=None, # Name of the program logfh=None, # Pipe where log text is to be sent to restart_file=None # Periodically dump pipeline here ): self.jobname = jobname self.prog = prog # If no restart file specified, use <jobname>.restart in CWD: if restart_file: self.restart_file = restart_file else: # Ev:63865 Save the restart file as local path: self.restart_file = jobname + ".restart" # If log file specified, dump all output to that file: global global_logfh if logfh: global_logfh = logfh else: global_logfh = sys.stdout # Write to standard out by default self.idle_function = None # Function to run periodically self.stagejobs = [] # List of all stage jobs. self._objects = {} # dictionary containing the actual data objects # where keys are string variable names. # variable names for variables that are self.intermediate_varnames = [] # stage outputs (workflow inputs are # excluded) # variable names that need to be available to the user when the job # finishes. self._user_outputs = set() # Job control will copy these files back to the # launch directory. # Variable name that is to be incorporated into Maestro self._structure_output = None self.host_dict = {} # key: product pool; value: host string # Default value for -host (and -HOST sometimes) self.host_dict['general'] = [('localhost', 1)] # Default value for -njobs (Use number of hosts as -NJOBS), Ev:117548 self.njobs = None # Default value for -adjust (adjust only if -NJOBS was not spcified), # Ev:117548 self.adjust = None self.force = None self.cleanup = True # Default value for -cleanup self.max_retries = None # What used specified for -max_retries. # Default: use SCHRODINGER_MAX_RETRIES or 2. self._has_started = False self.restart_from_beginning = False # Ev:83981
[docs] def setOptions( self, subjob_hosts=None, # hosts to run subjobs on. Dictionary # key: product pool; value: host list # must include "general" host list njobs=None, # number of subjobs to create adjust=None, # whether to continue with jobs in the event of subjob # failure force=None, # whether to adjust njobs to a reasonable # cleanup=None, # should each stage cleanup after itself max_retries=None, # how many times to restart subjobs ): """ Set the options of the pipeline. Call this function before calling pipeline.run() to set hosts/njobs/etc. When restarting, call this function to modify the options. """ # Change the options only when new values have been supplied. # Otherwise run as previously (when restarting) if subjob_hosts is not None: for host_pool, hoststr in subjob_hosts.items(): # For every non-queued host, if NCPUS was NOT specified (None), # set it to 1. Leave it as None (unlimited) for queued hosts: try: hostlist = jobcontrol.host_str_to_list(hoststr) except: raise RuntimeError("Invalid host entry: '%s'" % hoststr) hostlist2 = [] for (hostname, ncpus) in hostlist: if ncpus is None and not _host_is_queue(hostname): ncpus = 1 hostlist2.append((hostname, ncpus)) self.host_dict[host_pool] = hostlist2 if njobs is not None: self.njobs = njobs print('Setting njobs to:', njobs) if adjust is not None: print('Setting adjust to:', adjust) self.adjust = adjust if force is not None: print('Setting force to:', force) self.force = force if cleanup is not None: print('Setting cleanup to:', cleanup) self.cleanup = cleanup if max_retries is not None: print('Setting max_retries to:', max_retries) self.max_retries = max_retries print('')
[docs] def readNewFormatVariable(self, varname, keywords): var_class_name = None data = {} var_class_module = None for key, value in keywords.items(): if key.endswith(':'): # For backwards compatability key = key[:-1] if key == "VARCLASS": s = value.split('.') var_class_name = s[-1] var_class_module = ".".join(s[:-1]) else: data[key] = value if key == "FILE": filename = value runtime_filename = jobcontrol.get_runtime_path(filename) if not os.path.exists(runtime_filename): print("ERROR: File does not exist: %s" % runtime_filename) sys.exit(1) data[key] = runtime_filename elif key == "FILES": if isinstance(value, type([])): filenames = value else: filenames = [value] runtime_filenames = [] for filename in filenames: runtime_filename = jobcontrol.get_runtime_path(filename) if not os.path.exists(runtime_filename): print("ERROR: File does not exist: %s" % runtime_filename) sys.exit(1) runtime_filenames.append(runtime_filename) data[key] = runtime_filenames else: # PATH, etc. data[key] = value # It is not wise to use exec(), therefore using a method # Described in Python Cookbook (2002, section 15.3) if var_class_module is None: print("ERROR: No VARCLASS keyword for variable %s" % varname) sys.exit(1) if not var_class_module: var_class_module = "schrodinger.pipeline.pipeio" # Will raise ImportError if can't import: VariableClass = importName(var_class_module, var_class_name) # FIXME Add ability to set data as a dict data_value = None for key, value in data.items(): if key in ["FILE", "PATH", "FILES"]: # Need to specify input files as abolute paths, so that they # aren't copied when stage is launched, and so that they can # be referenced from stage job w/o modifying paths. if type(value) is list: data_value = [os.path.abspath(path) for path in value] else: data_value = os.path.abspath(value) if data_value is None: print("ERROR: Data string should begin with FILE, DATA, or PATH.") err = "ERROR: Data type %s not recognized for variable: %s" % ( key, varname) log(err) sys.exit(1) # Create an instance of the VariableClass: try: var_obj = VariableClass(data_value) except Exception as err: print("ERROR:", err) sys.exit(1) # Make sure that the data is valid (files exist, etc): try: var_obj.check() except RuntimeError as err: print("ERROR:", err) sys.exit(1) self._addVariable(var_obj, varname)
[docs] def readNewFormatStage(self, stagename, keywords): stage_class = None inIOnames = [] outIOnames = [] # List of (keyword, value) tuples, where value is in string format key_list = [] for key, value in keywords.items(): if key == "STAGECLASS": # So that old stage names continue to work: stage_class = old_stage_classes.get(value, value) elif key in ["INPUT", "INPUTS"]: if isinstance(value, type([])): inIOnames += value else: inIOnames += value.split() elif key in ["OUTPUT", "OUTPUTS"]: if isinstance(value, type([])): outIOnames += value else: outIOnames += value.split() else: # A keyword # value will be a list, string, or a dict (section): if isinstance(value, configobj.Section): # Ev:87429 ConfigObj sections can not be safely pickled: value = dict(value) key_list.append((key, value)) if not stage_class: err = "Error: No STAGECLASS specified for stage %s." % stagename log(err) raise RuntimeError(err) stagename = '%s-%s' % (self.jobname, stagename) self.createStage(stagename, inIOnames, outIOnames, stage_class, key_list)
def _addUserOut(self, value): value = str(value) # just in case self._user_outputs.add(value) def _setStructOut(self, value): value = str(value) # just in case self._structure_output = value
[docs] def readNewFormat(self, command_file): config = inputconfig.InputConfig(command_file) for sectionname, section in config.items(): if sectionname.startswith('STAGE:'): stagename = sectionname.split(':')[1] keywords = dict(section) # Ev:87429 self.readNewFormatStage(stagename, keywords) elif sectionname.startswith('SET:'): varname = sectionname.split(':')[1] self.readNewFormatVariable(varname, section) elif sectionname == 'USEROUTS': userouts = section['USEROUTS'] if not isinstance(userouts, type([])): # string userouts = userouts.split() for userout in userouts: self._addUserOut(userout) structout = section.get('STRUCTOUT') if structout: self._setStructOut(structout)
[docs] def readFile(self, command_file): """ Read a Pipeline input file. :raise RuntimeError: Raised if there is a problem with input file. """ # Determine if new or old format input file is specified: configobj_format = False with open(command_file) as fh: for line in fh: if line.strip().startswith('['): configobj_format = True break if configobj_format: self.readNewFormat(command_file) else: msg = "ERROR: Invalid input file. This version of VSW does not support this file format." print(msg) sys.stdout.flush() log(msg) raise RuntimeError(msg) self.inputsForStagesDefined() self.checkUserOutputs() for sjob in self.stagejobs: sjob.stageobj.initNonPersistent(self)
[docs] def checkUserOutputs(self): """ Make sure that all specified user outputs are variable names that are returned by a stage. This is done to fail on typos in input file. :raise RuntimeError: Raised on invalid USEROUT name. """ # Initialize to input object names: output_varnames = [] for fullname in list(self._objects): output_varnames.append(fullname) for stagejob in self.stagejobs: for varname in stagejob.stageobj.getOutputNames().values(): output_varnames.append(varname) for shortvarname in self._user_outputs: fullname = '%s-%s' % (self.jobname, shortvarname) if fullname not in output_varnames: raise RuntimeError("Invalid USEROUT: %s" % shortvarname) if self._structure_output is not None: fullname = '%s-%s' % (self.jobname, self._structure_output) if fullname not in output_varnames: raise RuntimeError("Invalid STRUCTOUT: %s" % self._structure_output)
[docs] def createStage(self, stagename, inIOnames, outIOnames, stage_class, keywords): """ Create a stage object and add it to the pipeline. :param stagename: Name of the stage. :param inIOnames: Input pipeio object names. :param outIOnames: Output pipeio object names. :param stage_class: module.class defining the stage. :type keywords: list :param keywords: All keywords for the stage, a list of (keyword, value) tuples. :raise RuntimeError or ImportError: Raised on input file error. """ for s in self.stagejobs: if s.stageobj.stagename == stagename: raise RuntimeError("Duplicate stage:" + stagename) s = stage_class.split('.') stage_class_name = s[-1] stage_class_module = ".".join(s[:-1]) if not stage_class: raise RuntimeError("ERROR: No class specified for stage " + stagename) if not stage_class_module: # Use default stage module: raise RuntimeError("ERROR: No stage module specified for stage " + stagename) # Import the class dynamically to avoid exec(): # Will raise ImportError if can't import: try: StageClass = importName(stage_class_module, stage_class_name) except: tmp = "schrodinger.pipeline.stages.%s" % stage_class_module # Will raise ImportError if can't import: try: StageClass = importName(tmp, stage_class_name) except: raise ImportError("Could not import %s" % stage_class) stage_class_module = tmp # Create an instage of the StageClass: self_weakref = weakref.proxy(self) # weak reference to pipeline driver_dir = os.getcwd() stageobj = StageClass(stagename, inpipeline=True, driver_dir=driver_dir) # Will work for new and old formats: for keyname, value in keywords: if keyname.endswith('_FILE') and value: # Ev:68946 Any file specified as a local path will be invalid # when we enter the stage's directory, so make it absolute: if DEBUG: print("DEBUG: Converting to absolute path: %s" % value) value = os.path.abspath(value) stageobj[keyname] = value for key, varname in enumerate(inIOnames): fullname = '%s-%s' % (self.jobname, varname) stageobj.setInput(key + 1, name=fullname) for key, varname in enumerate(outIOnames): fullname = '%s-%s' % (self.jobname, varname) stageobj.setOutputName(key + 1, fullname) stagejob = StageJob(stageobj, self_weakref, stage_class_module) self.stagejobs.append(stagejob) # Ev:45421 make sure all products that stage requires are installed: stageobj.checkProducts()
def _addVariable(self, var_obj, varname): """ Add an initiation variable to the pipeline. :raise RuntimeError: Raised if there is a problem with the variable name or if the variable name is already used. """ for letter in varname: if not letter.isalnum() and letter not in ['_', '-']: msg = "pipeline.py: varname must contain only alpha-numeric characters, dashes, and underscores." msg += "\n invalid variable name: %s" % varname raise RuntimeError(msg) fullname = '%s-%s' % (self.jobname, varname) if fullname in list(self._objects): raise RuntimeError("Duplicate variable:" + fullname) var_obj.name = varname self._objects[fullname] = var_obj self._objects[fullname].check()
[docs] def reportParameters(self): """ Print the parameters of each stage. """ #### Move these to Stage.run() ############################# log("General subjob hosts: %s" % str(self.host_dict['general'])) for host_pool, hosts in self.host_dict.items(): if host_pool != 'general': log("%s hosts: %s" % (host_pool, hosts)) if not self.njobs: log("Number of subjobs: Same as number of processors") else: log("Number of subjobs: %i" % self.njobs) if self.adjust is None: log("Adjust subjobs: False") # log( "Adjust subjobs: Only if neither #subjobs nor #CPUs are # specified") else: log("Adjust subjobs: %s" % str(self.adjust)) force_msg = ("Force job: Glide Docking job will continue in the event" " of Glide subjob failure (merge output from successful" " subjobs).") if not self.force: force_msg = ("No force job: Will terminate upon first docking" " subjob failure instead of merging output from" " successful Glide subjobs.") log(force_msg) # Determine what the stages will use for max_retries: if self.max_retries is None: # User did not specify -max_retires: env_max_retries = os.getenv('SCHRODINGER_MAX_RETRIES') if env_max_retries: max_retries = int(env_max_retries) else: max_retries = 2 else: max_retries = self.max_retries log("Maximum retries: %i" % max_retries) log("Cleanup: %s" % str(self.cleanup)) for stagejob in self.stagejobs: log('') stagejob.stageobj.reportParameters(global_logfh) log('') log("Status of stages:") log(" Stage Name Status") log(" ------------------------------ ---------") for stagejob in self.stagejobs: s = " %-30s %s" % (stagejob.stageobj.stagename, stagejob.status) log(s) log('')
[docs] def inputsForStagesDefined(self): """ Check if the inputs for all stages are either specified in the input variables or are outputs from other stages. :raise RuntimeError: Raised if inputs are not defined, as this indicates the input file is invalid. """ output_varnames = [] for stagejob in self.stagejobs: for varname in stagejob.stageobj.getOutputNames().values(): output_varnames.append(varname) initial_objects = list(self._objects) for stagejob in self.stagejobs: for varname in stagejob.stageobj.getInputNames().values(): if varname not in output_varnames + initial_objects: raise RuntimeError("ERROR: Input: %s not defined." % varname)
[docs] def startReadyStages(self): """ Start all stages that are ready to be run. When restarting, start WAITING and RESTARTING stages. When NOT restarting, start only WAITING stages. Return the number of stages that were started (not currently used). """ num_started = 0 for stagejob in self.stagejobs: if not stagejob.readyToRun(self._objects): continue # RESTARTING or WAITING num_started += 1 self._submitStage(stagejob) # Must be called after cd'ing into Pipeline's dir: self.dump() # dump the Pipeline instance return num_started
[docs] def setStageOptions(self, stageobj): """ Propagate the pipeline options (hosts, ncpus, etc) to the specified stage. """ product = stageobj.mainProduct() if product == 'mmod': product = 'macromodel' # Return hosts list specific to the product (if specified); # otherwise the general host list: hosts = self.host_dict.get(product, self.host_dict['general']) if DEBUG: print('DEBUG: Setting stage (%s) [%s] hosts to %s' % (stageobj.stagename, stageobj.mainProduct(), hosts)) sys.stdout.flush() # If max_retries passed to stageobj is None, then # SCHRODINGER_MAX_RETRIES (or 2) is used. stageobj.setJobOptions(hosts, self.njobs, self.adjust, self.force, self.max_retries, self.cleanup)
[docs] def requestCpus(self): pass
[docs] def updateStagesCpus(self): """ Send messages to stages (if necessary) telling them how many processors to use from each host. """
def _submitStage(self, stagejob): """ Start the specified stage. """ # We are in the stage directory during this function execution... ###### ALL PATHS ARE RELATIVE TO STAGE'S DIRECTORY ###### # At this point we are in the stage's directory already # ######################################################### # Ev:52954 # When a stage starts, it is not allcated any processors. # When it needs to use a host/processor, it needs to check it # from the Pipeline (jobdj will do that) by sending a jobcontrol # message to it with the number of processors needed. # Pipeline will respond by sending a message stating how many # processors to use. # When the stage no longer needs as many processors (or needs more) # it sends another message to Pipeline asking for a different number, # after which Pipeline sends a message with updated numbers. sjname = stagejob.stageobj.stagename # Just for shorter lines log_file = sjname + ".log" dump_file = sjname + ".dump" stage_script = sjname + ".py" out_file = sjname + ".out" stagejob._writeStageExecutable(stage_script) stagelauncher = launcher.Launcher( script=stage_script, copyscript=True, # To allow use of custom (not built-in) stages jobname=sjname, host='localhost', prog=self.prog, # Stage program name local=not USING_JOB_SERVER, # For restartability ) # stagejob.stageobj is already updated from the dump file (if exists) # #### # stored in Pipeline, NOT restart file if stagejob.status == RESTARTING: if self.restart_from_beginning: # Ev:83981 print('Restarting stage "%s" from beginning' % sjname) stagejob.status = WAITING elif sjname not in updated_from_dump: # Dump file did not exist for this stage print( 'WARNING: No dump file for stage "%s"; starting from beginning' % sjname) stagejob.status = WAITING if stagejob.status == RESTARTING: # Restarting; use recovered instance: stageobj = updated_from_dump.get(sjname) stageobj.initNonPersistent(self) else: # NOT restarting # Use the not-started instance: stageobj = stagejob.stageobj # Will submit the instance of the stage that was created when the # Pipeline was created. This instance has never run before. This # is because when the stage runs, it never modifies the Pipeline # instance, since it is running in its own process. # Set the input objects for the stage and add the files of input # objects to the jobcontrol -in file list: for position, varname in stageobj.getInputNames().items(): var_obj = self._objects[varname] stageobj.setInput(position, obj=var_obj) for f in var_obj.getFiles(): stagelauncher.addInputFile(f) # Set the names for output objects for the stage: for position, varname in stageobj.getOutputNames().items(): stageobj.setOutputName(position, varname) ###### MODIFY THE STAGE INSTANCE - launch options ########## # Any that are default are set to None: self.setStageOptions(stageobj) if os.path.isfile(dump_file): # Dump the modified instance of the stage to tmp file: tmp_file = dump_file + '.tmp' stagejob.pipeline = None with open(tmp_file, "wb") as fh: pickle.dump(stageobj, fh, protocol=2) # weak reference to pipeline stagejob.pipeline = weakref.proxy(self) # Backup the old file & copy new from tmp: dump_bu = dump_file + '.bu' if os.path.isfile(dump_bu): os.remove(dump_bu) # Required for Win os.rename(dump_file, dump_bu) os.rename(tmp_file, dump_file) else: # No need to back up the old instance: with open(dump_file, "wb") as fh: pickle.dump(stageobj, fh, protocol=2) stagelauncher.addScriptArgs([dump_file]) stagelauncher.setStdOutErrFile(log_file) # Now registers the dump file as an input file instead of log file, # because jlaunch.pl now deletes the registered log files when # launching the job. The problem with this approach is that the dump # file will not be copied to to the launch dir at the end of the job. # Dump file is registered as output file as well, at the end of # stage's run(), to be copied back to driver's job dir. stagelauncher.addInputFile(dump_file) stagelauncher.addOutputFile(dump_file) stagelauncher.addOutputFile(out_file) job = stagelauncher.launch(print_jobid=False) stagejob.jobid = job.job_id if stagejob.status == RESTARTING: stagejob.printAction('restarted') else: stagejob.printAction('launched') stagejob.status = RUNNING sys.stdout.flush() def _updateStageStatusRestarting(self): """ Update the status of each stage job. This runs when restarting a pipeline. If a job has failed for any reason, it marks it to be restarted. If a job is still running or submitted, it does nothing. """ for stagejob in self.stagejobs: stagename = stagejob.stageobj.stagename if stagejob.status == FAILED: stagejob.restart('failed') elif stagejob.status == RUNNING: # Stage.stageobj is already UPDATED from the dump file!!! # Was RUNNING, now COMPLETED. if stagejob.stageobj.hasCompleted(): _get_job(self.jobid).download() stagejob.finish() self.dump() self.cleanupIntermediateFiles(stagejob) else: # NOT completed successfully (failed or still running): try: jobobj = _get_job(stagejob.jobid) jobobj.readAgain() except mm.MmException as e: if e.rc == mmjob.MMJOB_FILE_NOT_FOUND: stagejob.restart('missing') else: raise # any other error else: # Have gotten the job object. # The job is under jobcontrol and the last time I # checked was RUNNING: if jobobj.Status in ['running', 'submitted']: print("Stage %s is still %s" % (stagename, jobobj.Status)) pass # The job is still running else: # Failed or completed not successfully: stagejob.restart(jobobj.Status) def _updateStageStatus(self): """ Update the status of each RUNNING stage job object. This is periodically run by the pipeline. It checks the status of RUNNING jobs and updates them if they have completed of failed. """ for stagejob in self.stagejobs: if stagejob.status != RUNNING: continue try: jobobj = _get_job(stagejob.jobid) jobobj.readAgain() except mm.MmException as e: if e.rc == mmjob.MMJOB_FILE_NOT_FOUND: stagejob.died('missing') continue else: raise # any other error # Have aquired the job object. # The job is under jobcontrol and the last time I checked was # RUNNING: if jobobj.Status in ["completed", "exited", "stranded", "fizzled"]: try: exitstatus = jobobj.ExitStatus except: pass # No exit status -- most likely still running else: # Got exit status successfully. if exitstatus in ["died", "stranded", "killed", "fizzled"]: stagejob.died(exitstatus) else: # good exit status: # Download the output files of the stage job. # This will allow the updated dump from stage job to be read. _get_job(stagejob.jobid).download() stagejob.updateFromDump(quiet=True) latest_stageobj = updated_from_dump.get( stagejob.stageobj.stagename) if latest_stageobj is None: print( 'ERROR: Stage has completed but could not be updated from the dump file' ) stagejob.died('failed') # Will quit the Pipeline elif not latest_stageobj.hasCompleted(): print( 'ERROR: Stage has completed but stageobj.hasCompleted() is False' ) stagejob.died('failed') # Will quit the Pipeline else: # Stage object completed successfully stagejob.finish() # Was RUNNING, now COMPLETED. self.dump() self.cleanupIntermediateFiles(stagejob) else: pass # The job is still running def __getstate__(self): state_dict = dict(self.__dict__) # Backend objects can't be usefully pickled as they depend on the # jmonitor instance the job was original launched under. state_dict['_backend'] = None return state_dict
[docs] def dump(self): """ Dumps the Pipeline instance to a restart file """ idle_function_bu = self.idle_function # Make the object pickable: self.idle_function = None for stagejob in self.stagejobs: stagejob.pipeline = None # Dump to restart file (if any): if self.restart_file: temp_file = self.restart_file + '.tmp' with open(temp_file, "wb") as fh: pickle.dump(self, fh, protocol=2) # VSW-843 Must use force_rename() on Windows # (it will replace the existing file too): fileutils.force_rename(temp_file, self.restart_file) # Run idle callback (if any): if idle_function_bu: idle_function_bu() # Restore to original state: self.idle_function = idle_function_bu for stagejob in self.stagejobs: stagejob.pipeline = weakref.proxy(self)
def _getMaxRetries(self): """ Unimplemented. """
[docs] def getStageByJobid(self, jobid): for stagejob in self.stagejobs: if stagejob.jobid == jobid: return stagejob return None
[docs] def handleStageMessages(self): # Read messages from stages: cpus_freed_up = False message = self._backend.nextMessage() while message: s = message.split(None, 3) # s[0] will be "pipeline" if len(s) < 3: print("PIPELINE ERROR: Could not handle message:") print(' "%s"' % message) raise RuntimeError("Cound not handle message: %s" % message) stage_jobid = s[1] total_ncpus_requested = int(s[2]) if len(s) < 4: # hosts list is empty: stage_hosts_str = '' else: stage_hosts_str = s[3] stagejob = self.getStageByJobid(stage_jobid) if not stagejob: print('ERROR: Cound not find stage with jobid: %s' % stage_jobid) sys.stdout.flush() # Update Pipeline's list of currently used hosts from the info # sent to it by the stage: new_hosts = jobcontrol.host_str_to_list(stage_hosts_str) stagejob.setUsedHosts(new_hosts) # Update Pipeline's list of NCPUS requested by this stage: if total_ncpus_requested < stagejob.num_needed_cpus: cpus_freed_up = True if total_ncpus_requested != stagejob.num_needed_cpus: if DEBUG: print('Stage %s requested %i cpu(s) total' % (stagejob.stageobj.stagename, total_ncpus_requested)) stagejob.num_needed_cpus = total_ncpus_requested # Update the list of hosts/cpus that the stage can use: stagejob.updateStageCpus() message = self._backend.nextMessage() if cpus_freed_up: if DEBUG: print('Some CPUs freed up, distributing...') sys.stdout.flush() self.distributeCpus()
[docs] def distributeCpus(self): """ Called when extra CPUs become available (as given back by the stages using them). Will distribute the freed-up CPUs to other stages. """ for stagejob in self.stagejobs: avail_host_list = stagejob.getUnusedHostList() more_cpus_needed = (stagejob.num_needed_cpus > stagejob.num_given_cpus) if avail_host_list and more_cpus_needed: stagejob.updateStageCpus()
[docs] def run(self, idle_function=None): """ Run the Pipeline. :param idle_function: A routine to call periodically while running the pipeline. :raise RuntimeError: Raised if Pipeline failed for any reason. """ if DEBUG: print("Running in debug mode") self.idle_function = idle_function restarting = self._has_started if restarting: print("Attempting to restart the pipeline...") else: print("Starting the pipeline...") self._has_started = True # Get the backend of this Pipeline driver job: self._backend = jobcontrol.get_backend() if self._backend: print('\nWill listen to "pipeline" messages from stages') self._backend.addMessageName('pipeline') # Tell jobcontrol to periodically copy back the stage log files: # (previously this was done in the startup script, which caused # the jlaunch argument to be way too long) Ev:100401 if self._backend: # We are running under job control for stagejob in self.stagejobs: log_file = stagejob.stageobj.stagename + '.log' self._backend.addLogFile(log_file) num_stages = 0 for stagejob in self.stagejobs: num_stages += 1 # If restarting, the pipeline reference needs to be updated: stagejob.pipeline = weakref.proxy(self) if num_stages == 0: log("ERROR: No stages to run! Please use readFile() to read an input file" ) raise RuntimeError # Update the statuses of stages based on latest jobcontrol info: # Re-throw exception to reduce traceback printouts: try: if restarting: # Will set status of failed stages to RESTARTING # and status of completed stages to COMPLETED: self._updateStageStatusRestarting() else: self._updateStageStatus() except RuntimeError as e: raise RuntimeError(str(e)) self.reportParameters() self.printAction(None, None, None) # ***** PIPELINE STARTED ***** old_statuses = [] while True: # Run this loop until no more stages are running: new_statuses = [(stagejob.status) for stagejob in self.stagejobs] if old_statuses != new_statuses: # Status of any stage changed, start ready stages: self.startReadyStages() self.dump() tmp_statuses = [(stagejob.status) for stagejob in self.stagejobs ] if RUNNING not in tmp_statuses: # No stages are running/ready: break # exit the loop # Check for messages every second (and handle, if any) if self._backend: self.handleStageMessages() # Sleep for 3 seconds - prevents from taking down the system. time.sleep(3) old_statuses = [(stagejob.status) for stagejob in self.stagejobs] try: # get updated status from jobcontrol for each stage: self._updateStageStatus() except RuntimeError as e: raise RuntimeError(str(e)) # ***** PIPELINE FINISHED ***** num_waiting = new_statuses.count(WAITING) num_failed = new_statuses.count(FAILED) if num_failed: # Print the list of stages that have failed: failed_stages = [s for s in self.stagejobs if s.status == FAILED] print("") for stagejob in failed_stages: stagejob.printFailureMessage() print("") self.dump() # Save the state of the pipeline before exiting # Kill the Pipeline, since there are no more stages to run: snames = [sj.stageobj.stagename for sj in failed_stages] raise RuntimeError("Stage(s) failed: %s. Exiting." % ', '.join(snames)) elif num_waiting: print( "No more running stages, %i waiting stages, and none of them are ready." % num_waiting) print("User outputs until this point:", self._user_outputs) if self.prog: print(self.prog, end=' ') print("EXITING: no ready stages!") sys.stdout.flush() raise RuntimeError("ERROR: no ready stages!") else: log("All stages have completed sucessfully.") uouts = self.getUserOutputs() structout = self.getStructureOutput() if uouts: output_files = [] incorporatable_file = None log("User outputs:") for obj in uouts: # For each output file, if it's a USEROUT file, move it to # the driver's directory (from stage's directory) and # register it with jobcontrol so that it gets copied back: obj_files = obj.getOutputPaths() for filename in obj_files: output_files.append(filename) # Figure out whether this file needs to be incorporated: is_struct_out = structout and (obj.name == structout.name) if is_struct_out and len(obj_files) == 1: incorporatable_file = obj_files[0] if is_struct_out: msg = " %s (incorporatable) %s" % (obj.name, str(obj)) else: msg = " %s %s" % (obj.name, str(obj)) log(msg) # Make a link (or copy) each output file into the CWD and # add them to the job record: for filename in output_files: # Make a link to the output file to CWD (driver job # directory): directory, local_filename = os.path.split(filename) if os.path.abspath(filename) != os.path.abspath( local_filename): # Only if this file is not already in the CWD, # create a link to it in the CWD. If creating a # link failed for whatever reason, make a copy: # Ev:84675 & Ev:97589 try: fileutils.create_hard_link(filename, local_filename) except: if os.path.isdir(filename): # Typically a Phase directory shutil.copytree(filename, local_filename) else: shutil.copy(filename, local_filename) # Register the link with jobcontrol: if self._backend: # Driver running under jobcontrol # NOTE: For this file to be copied to launch dir # correctly it needs to be in driver's job dir: self._backend.addOutputFile(local_filename) if filename == incorporatable_file: self._backend.setStructureOutputFile(local_filename) sys.stdout.flush()
[docs] def getUserOutputs(self): """ Return a list of pipeio objects that are to be presented to the user at the end of the Pipeline run. """ outs = [] for varname in self._user_outputs: fullname = '%s-%s' % (self.jobname, varname) try: obj = self._objects[fullname] except KeyError: # Stage did not set this output object continue obj.name = varname outs.append(obj) return outs
[docs] def getStructureOutput(self): if self._structure_output is None: return None else: fullname = '%s-%s' % (self.jobname, self._structure_output) try: obj = self._objects[fullname] except KeyError: # Stage did not set this output object return None obj.name = self._structure_output return obj
[docs] def getUserOutputFiles(self): """ Return a list of files for all user (final) outputs of Pipeline. """ output_files = [] for obj in self.getUserOutputs(): output_files.extend(obj.getFiles()) return output_files
[docs] def printAction(self, stagename, stagejobid, action): """ Print an action for stage `stagename`. :param stagejobid: The jobid of the stage. :param action: The latest action of the stage. """ # m is the width of the #jobs column: # m==1 when #jobs is 1-9, 2 when #jobs is 10-99, 3 when #jobs is # 100-999. m = len(str(len(self.stagejobs))) if stagename is None: log("Stage activity keys...") log(" C: Number of completed stages") log(" A: Number of active/running stages") log(" W: Number of waiting/pending stages") line = "\n%*s %*s %*s | %s" % (m, "C", m, "A", m, "W", "Stage name and activity") log(line) s = "-" * m line = "%*s %*s %*s | %s" % (m, s, m, s, m, s, "-----------------------") log(line) else: new_statuses = [(stagejob.status) for stagejob in self.stagejobs] num_completed = new_statuses.count(COMPLETED) num_running = new_statuses.count(RUNNING) num_waiting = new_statuses.count(WAITING) line = "%*d %*d %*d | %s (%s) %s." % (m, num_completed, m, num_running, m, num_waiting, stagename, stagejobid, action) log(line) sys.stdout.flush()
[docs] def cleanupIntermediateFiles(self, stagejob): """ Remove any stage outputs that are no longer needed. Intermediate files are any outputs of a previously completed stage. They are no longer needed if they are not marked as a USEROUT, and they are not needed as input for any yet-to-be-run stage. """ # Ev:107213 # Ev:125676 if not self.cleanup: return # The <stagjob> is the stage that have just completed. # Make a list of variable names that are still needed: needed_varnames = set() for varname in self._user_outputs: fullname = '%s-%s' % (self.jobname, varname) needed_varnames.add(fullname) for stagejob in self.stagejobs: if stagejob.status != COMPLETED: stageobj = stagejob.stageobj needed_varnames.update(list(stageobj.getInputNames().values())) # for each stages input, if it's not longer used, remove it: for varname in self.intermediate_varnames[:]: if varname not in needed_varnames: # To work-around a bug where GeneratePhaseDB did not list # the output database as a USEROUT: if varname.endswith("-DATABASE"): continue print("Cleaning up intermediate files:", varname) self.intermediate_varnames.remove(varname) files_to_delete = self._objects[varname].getFiles() for filename in files_to_delete: try: os.remove(filename) except Exception as err: print("WARNING: Failed to remove file:", filename) print("Exception:", err) del self._objects[varname]
[docs] def getUsedHosts(self, host_pool): """ Return a dictionary of hosts that are CURRENTLY checked out (used) by all stages combined (within specified host_pool). """ used_hosts = {} for stagejob in self.stagejobs: if stagejob.host_pool == host_pool: for hostname, njobs in stagejob.checked_out_hosts: try: used_hosts[hostname] += njobs except KeyError: used_hosts[hostname] = njobs return used_hosts
[docs]def Restart(restart_file, restartbeg=False): """ Recover a saved Pipeline instance. Specify new options only if the settings need to change. Returns a Pipeline instance recovered from the restart_file. You need to call `pipeline.run()` in order to get the pipeline running. :raise RuntimeError: Raised if a Pipeline can't be loaded from the specified file. :param restartbeg: Whether to start failed stages from beginning. """ if not os.path.isfile(restart_file): err = "Pipeline.Restart(): file does not exist: %s" % restart_file raise IOError(err) log('Pipeline: Loading from a saved state...') if not os.path.isfile(restart_file): raise RuntimeError("Pipeline.Restart(): restart file does not exist.") try: with open(restart_file, "rb") as fh: pipeline = pickle.load(fh) except Exception as err: msg = str(err) msg += '\n' + \ "Pipeline.Restart(): could not load job from restart file." raise RuntimeError(msg) log('Successfully loaded Pipeline from restart file.') global updated_from_dump updated_from_dump = {} # Reset the dictionary # Update stages from the dump file if not WAITING: for stagejob in pipeline.stagejobs: if stagejob.status != WAITING: stagejob.updateFromDump() if updated_from_dump: log(' Some stages were updated from dump files') log('') # new-line to seperate from pipeline.run() messages if restartbeg: # Ev:83981 pipeline.restart_from_beginning = True return pipeline
# EOF