Source code for schrodinger.ui.qt.appframework2.tasks

"""
<<<<< DEPRECATED >>>>>
This module should not be used for new code. Instead, consider using
`schrodinger.tasks.tasks`
<<<<< !!!!!!!!!  >>>>>



Task runner classes are designed to be subclassed to define runners for specific
tasks. A "task" is a generic term that encompasses jobs, threads, and subprocess
calls.

"""

from schrodinger.Qt import QtCore
from schrodinger.ui.qt.appframework2 import jobnames
from schrodinger.ui.qt.appframework2 import validation

ERROR, WARNING, QUESTION, INFO, STATUS = list(range(5))


#===============================================================================
# Task Wrapper
#===============================================================================
[docs]class Status(object): NONE = '-' RUNNING = 'Running' FAILED = 'Failed' DONE = 'Done' ERROR = 'Error'
[docs]class AbstractTaskWrapper(object): """ Provides a common interface for tasks that is independent of the underlying task object. In main methods are self.isRunning() and self.status(), the self.getName() and self.setName() """ TASK_CLASS = type(None)
[docs] def __init__(self, task, settings=None, name='', test_mode=False): """ :param task: the underlying task object (depends on subclass) :type task: see derived class :param name: the task name :type name: str :param settings: the settings used to run this task :type settings: dict :param test_mode: disables type-checking of the task object. Used for mocking that task in tests :type test_mode: bool """ if self.TASK_CLASS is None: raise RuntimeError('Cannot instantiate AbstractTaskWrapper.') if not test_mode and not isinstance(task, self.TASK_CLASS): raise TypeError('Task %s must be of type %s.' % (task, self.TASK_CLASS.__name__)) self._task = task self._status = None if settings is None: settings = {} self._settings = settings.copy() self.setName(name)
[docs] def isRunning(self): """ Whether this task is currently running. """ raise NotImplementedError()
[docs] def status(self): """ The current status of the task. The schema is flexible and can be agreed upon with the corresponding runner. """ return self._status
[docs] def settings(self): """ Returns the settings that were used to run this task. """ return self._settings
[docs] def getName(self): return self._name
[docs] def setName(self, name): self._name = name
def __str__(self): return self.getName() def __repr__(self): return '%s: %s' % (self.__class__.__name__, self.getName())
#=============================================================================== # Task Runner Base Class #===============================================================================
[docs]class AbstractTaskRunner(validation.ValidationMixin, QtCore.QObject): stateChanged = QtCore.pyqtSignal() startRequested = QtCore.pyqtSignal() startFailed = QtCore.pyqtSignal() taskStarted = QtCore.pyqtSignal(AbstractTaskWrapper) taskEnded = QtCore.pyqtSignal(AbstractTaskWrapper) nameChanged = QtCore.pyqtSignal() resetAllRequested = QtCore.pyqtSignal()
[docs] def __init__(self, messaging_callback=None, settings_callback=None): """ Initializes a new task runner :param messaging_callback: callback used for interacting with the user. This includes both reporting results or errors and asking questions to the user. The callback should be of the form: f(message_type, text, options=None, runner=None) where message_type is one of ERROR, WARNING, or QUESTION, text is the text of the message to be displayed, and options is an optional dict which can be used by the callback. For example, a dialog title could be passed to the callback via the options dict. The callback should never depend on the presence or absence of any options. The runner is simply the runner that is making the call, so that the callback can tell which runner is invoking the callback. When the message is a question, the function should return the user's response, typically True or False for a yes/no question. :type messaging_callback: function :param settings_callback: callback for communicating state with the parent object. This callback can be used both to push state to or pull state from the parent object. The callback should be of the form: f(settings=None, runner=None) If no settings are passed in, the callback should return the state of the parent object (i.e. the panel state) in the form of a dictionary. The runner can be passed in as well if the callback needs to access the runner to properly process the callback. If a settings dictionary is passed in, the callback should apply any settings in the dictionary to the parent object (thus altering its state). Passing in an empty dictionary is a no-op. :type settings_callback: function """ QtCore.QObject.__init__(self) validation.ValidationMixin.__init__(self) self.setCallbacks(messaging_callback, settings_callback) self._settings = self.defaultSettings() self.active_tasks = [] self.past_tasks = [] self.tasks_by_name = {} self.custom_name = None self.history_length = 5 self.allow_concurrent = True self.base_name = 'task' self.runner_name = None self.allow_custom_name = True self.setRunnerOptions() if self.runner_name is None: self.runner_name = self.base_name
[docs] def setCallbacks(self, messaging_callback=None, settings_callback=None): def no_op(*args, **kwargs): return {} if messaging_callback is None: messaging_callback = no_op if settings_callback is None: settings_callback = no_op # TODO: use weakref? callbacks disappeared when I tried. self._settingsCallback = settings_callback self._messagingCallback = messaging_callback
[docs] def setRunnerOptions(self): """ Optional override to set options for the runner. Not overriding this at all results in using all default values. self.allow custom_name - whether name is user-editable. Default: False self.allow_concurrent - whether another task can be started while one is still running. Default: True self.history_length - how many past jobs to keep track of. Default: 5 self.base_name - the base for task names. The base name gets modified to generate unique task names. Ex. MyTask_3. Default: "task" self.runner_name - a name to describe the type of task. Equivalent to program_name for jobs. Default: 'task' """
[docs] def nextName(self, name_list=None): """ Returns the name that will be assigned to the next task that gets run. There is no currentName(), as multiple tasks might be running concurrently. To get the name of an existing task, use task.getName(). If a custom name has been set, that will be used as the next name. Otherwise, the base name will be used to generate a new unique name. This method can be overridden to alter the task naming behavior. :param name_list: Optional list of names to uniquify against. If not given, the name will be compared against the stored self.names() :type name_list: list of basestring """ if not self.base_name and not self.custom_name: return '' # Determine the next name if not name_list: name_list = self._getTakenNames() next_name = jobnames.update_jobname(self.base_name, self.base_name, name_list=name_list) # If the custom name matches, reset if self.custom_name == next_name: self.custom_name = None if self.custom_name: return self.custom_name return next_name
[docs] def setCustomName(self, name): """ Sets a custom name for the next task to be run. :param name: the custom name. Pass in an empty string to return to standard naming. :type name: str """ name = str(name) # This value is usually unicode from a QLineEdit if not self.allow_custom_name: return self.custom_name = name self.nameChanged.emit()
[docs] def preValidate(self): """ Override this to include any logic that should be run prior to the validation step. :return: Whether this step has succeeded. Returning False will result in aborting the task :rtype: bool """ return True
[docs] def postStart(self, task): """ Override this to include any logic that should be run immediately after a task is started. This will only be run after a task actually starts. The started task is passed in as a parameter to allow interaction with the task instance. Note that there is no guarantee that the task is still running when this method is called. :param task: the task that was just started :type task: AbstractTaskWrapper """
[docs] def postProcess(self, task): """ Override this to include any logic that should be run whenever a task completes. This method is called whenever a task stops running, whether it succeeded, failed, or encounrtered an error. The completed task is passed in as a parameter to allow querying and modification of the task instance. There is currently no mechanism for ensuring this logic gets run between maestro sesions. If a session is closed while the job is running, this method will never be called. Use this method to perform only actions that make sense in the context of a single session. :param task: the task that has ended :type task: AbstractTaskWrapper """
[docs] def start(self): """ Starts the task. This includes the preliminary work of calling preValidate() and running validation before attempting to actually start the task itself. The actual starting of the task should be handled in the _start method in the derived classes and will vary depening on the type of runner. """ self.startRequested.emit() try: result = self._preFlight() except: self.startFailed.emit() raise if not result: self.startFailed.emit() return False try: task = self._start() except: self.startFailed.emit() raise if not task: self.startFailed.emit() return False self.addTask(task) self.taskStarted.emit(task) self.stateChanged.emit() self.postStart(task) return True
def _start(self): """ Should be implemented by derived class. Whereas the public start() method performs the preliminary tasks of validation and job setup, as well as tracking the started task, this method, _start(), should only contain the code to start the task using the mechanism specific to the type of runner (i.e. start a thread, launch a job, create a subprocess, etc.) :return: the task that has been started, or None, if the task could not be started. :rtype: AbstractTaskWrapper """ raise NotImplementedError() #=========================================================================== # Task list #===========================================================================
[docs] def names(self): return list(self.tasks_by_name)
def _getTakenNames(self): """ Returns a list of names that are already taken. By default, this is just the names of all the tasks started by this runner; however, this method may be overriden to include other names as well (for example, all the dir names in the cwd, so that jobs will not accidentally overwrite each other. """ return self.names()
[docs] def tasks(self): return list(self.tasks_by_name.values())
[docs] def addTask(self, task): """ Add a new task to be tracked. This should be called whenever a task is started. :param task: the task :type task: AbstractTaskWrapper """ name = task.getName() # we can try and set the name here, but it might be too late if the # task has already retrieved its name. if not name: name = self.nextName() task.setName(name) self.tasks_by_name[name] = task if task.isRunning(): self.active_tasks.append(task)
[docs] def findTask(self, name): return self.tasks_by_name.get(name)
#=========================================================================== # Messages #===========================================================================
[docs] def showMessage(self, message_type, text, options=None): """ Communicates with the parent object via the messaging_callback. This method generally doesn't need to be called; call error, warning, question, or info instead. :param message_type: the type of message to send :type message_type: int :param text: the main text of the message :type text: str :param options: a dictionary of other options to be processed by the messaging_callback. :type options: dict """ return self._messagingCallback(message_type, text, options)
[docs] def error(self, text, caption='Error'): return self._messagingCallback(ERROR, text, {'caption': caption}, runner=self)
[docs] def warning(self, text, caption='Warning'): return self._messagingCallback(WARNING, text, {'caption': caption}, runner=self)
[docs] def question(self, text, caption='Question'): return self._messagingCallback(QUESTION, text, {'caption': caption}, runner=self)
[docs] def info(self, text, caption='Info'): return self._messagingCallback(INFO, text, {'caption': caption}, runner=self)
[docs] def status(self, text, timeout=3000, color=None): """ Request a status message to be displayed by the runner's parent. :param text: the text to display :type text: str :param timeout: duration in ms to display the status. A timeout of 0 results in a permanent message. :type timeout: int :param color: color of the status message. :type color: QtGui.QColor """ options = {'timeout': timeout, 'color': color} return self._messagingCallback(STATUS, text, options, runner=self)
[docs] def updateStatusText(self): """ Override this to update the status, for example, when settings have changed or the current task runner is switched. """
#=========================================================================== # Task settings - retrieving the specifications for a job #===========================================================================
[docs] def pullSettings(self): """ This method calls the settings callback, which should return the user's input for this job, such as input files, options, etc. For GUI panels, this is how the panel state is applied to the job runner. """ settings = self._settingsCallback(runner=self) self._settings.update(settings)
[docs] def pushSettings(self, settings=None): """ Pushes a settings dictionary via the settings callback. Doing this will alter the state of the parent object (generally the panel). This function can be used to reset the panel or load saved presets. If a settings dictionary is not passed in, the current job settings will be used. :param settings: a settings dictionary to push to the parent object :type settings: dict """ if settings is None: settings = self._settings self._settingsCallback(settings, runner=self)
[docs] def settings(self): return self._settings
[docs] def defaultSettings(self): """ Override this method to define default values for any settings. This dictionary of default settings will be used to reset the parent. """ return {}
[docs] def resetAll(self): self.resetAllRequested.emit()
[docs] def reset(self): """ Resets the parent object using the default settings defined by the task runner. """ self._settings = self.defaultSettings() self.pushSettings()
#=========================================================================== # Internal use #=========================================================================== def _preFlight(self): if not self.allow_concurrent and self.isRunning(): self.error('Please wait until the last task is complete.') return False self.pullSettings() if self.preValidate() == False: return False if not self.runValidation(): return False return True def _update(self): # PANEL-14099: Linux machines may return task.isRunning() True, while # task.status() is DONE. Tested in GUI, this bandage works. completed_tasks = [ task for task in self.active_tasks if (not task.isRunning() or task.status() == Status.DONE) ] for task in completed_tasks: self.active_tasks.remove(task) self.past_tasks.append(task) self.postProcess(task) self.taskEnded.emit(task) if completed_tasks: self.stateChanged.emit()
[docs] def reportValidation(self, results): """ Present validation messages to the user. This is an implmentation of the ValidationMixin interface and does not need to be called directly. :param results: Set of results generated by validate() :type results: ValidationResults """ for result in results: if not result: message = result.message if not message: message = 'Validation failed. Check settings and try again.' self.error(message) return False else: if result.message: cont = self.question(result.message, caption='Warning') if not cont: return False return True
[docs] def isRunning(self): return bool(self.active_tasks)
def __str__(self): return '%s.%s' % (self.__module__, self.__class__.__name__)
#=============================================================================== # Base Function Runner #===============================================================================
[docs]class BaseFunctionRunner(AbstractTaskRunner): """ Base class for runners that can take a callable on instantiation or define task logic in the runMain() method. Passing in a callable will override any implementation runMain(). """
[docs] def __init__(self, func=None, messaging_callback=None, settings_callback=None): """ :param func: the callable that will be run as the main task. Overrides self.runMain(). If self.runMain() is not defined, a func must be provided. :type func: callable """ AbstractTaskRunner.__init__(self, messaging_callback, settings_callback) if func is not None: self.runMain = func
def _runMain(self, task): """ Wraps runMain() to handle common functionality in running a callable as a task. Updates task status, handles errors, reports success/failure, etc. :param task: the wrapped task object :type task: AbstractTaskWrapper """ task._status = Status.RUNNING try: return_code = self.runMain(task) except: task._status = Status.ERROR QtCore.QTimer.singleShot(0, self._update) raise if return_code is not None: task._status = return_code else: task._status = Status.DONE if task.status() == Status.DONE: self.status('Task completed') else: self.status('Task status: %s' % task.status()) # Singleshot timer so that _update is called after thread is finished QtCore.QTimer.singleShot(0, self._update)
[docs] def runMain(self, task): raise NotImplementedError()
#=============================================================================== # Blocking Runner #===============================================================================
[docs]class BlockingRunner(BaseFunctionRunner): """ Runner class that makes a blocking call to its main function. Useful for quick calculations. This can either be subclassed with runMain() being implemented with the main logic, or used directly by passing in a callable. """ def _start(self): task = BlockingWrapper(settings=self.settings()) self._runMain(task) return task
[docs]class BlockingWrapper(AbstractTaskWrapper): """ Since a blocking call has no real associated object, this is essentially an empty wrapper to provide the right interface for the runner. """ TASK_CLASS = dict # Just need something other than None
[docs] def __init__(self, settings=None, name='', **kwargs): dummy = {} # Just needs to match TASK_CLASS AbstractTaskWrapper.__init__(self, dummy, settings, name, **kwargs) self._status = Status.NONE
[docs] def isRunning(self): return False
#=============================================================================== # Thread Runner #===============================================================================
[docs]class ThreadRunner(BaseFunctionRunner): """ An object to run tasks in threads. To use, subclass this class and override the runMain() method with logic to be run by the thread. Options can be set by overriding setOptions(). See parent class for more information. This can either be subclassed with runMain() being implemented with the main logic, or used directly by passing in a callable. """ use_event_loop = False
[docs] def __init__(self, *args, **kwargs): super(ThreadRunner, self).__init__(*args, **kwargs) self.event_loop = QtCore.QEventLoop()
def _start(self): """ Implements the creation and starting of the thread. """ thread = QtCore.QThread() task = ThreadWrapper(thread, settings=self.settings(), name=self.nextName()) task._status = Status.NONE thread.run = lambda: self._runMain(task) thread.start() return task
[docs] def addTask(self, task): super(ThreadRunner, self).addTask(task) if self.use_event_loop and self.active_tasks: # we need to use a singleshot because starting the event loop will # block any code that follows it QtCore.QTimer.singleShot(0, self._startEventLoop)
def _startEventLoop(self): if not self.event_loop.isRunning(): self.event_loop.exec() def _update(self): super(ThreadRunner, self)._update() if self.use_event_loop and not self.active_tasks: self.event_loop.exit()
[docs]class ThreadWrapper(AbstractTaskWrapper): """ Wraps a QtCore.QThread to present a common task API. """ TASK_CLASS = QtCore.QThread
[docs] def isRunning(self): return self._task.isRunning()