Source code for schrodinger.tasks.queue

import enum
import functools
import os
import time

from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.job.queue import NOLIMIT
from schrodinger.Qt import QtCore
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
from schrodinger.utils import mmutil
from schrodinger.utils import qt_utils
from schrodinger.utils import scollections

#===============================================================================
# Task Queue
#===============================================================================


[docs]class TaskQueue(tasks.SignalTask): """ A task that runs a queue of tasks. The TaskQueue is done when all its added tasks have completed, regardless of whether they completed successfully or failed. To use, add tasks with addTask and then start the task queue. """ queuedTaskFinished = QtCore.pyqtSignal(object) queueDone = QtCore.pyqtSignal() max_running_tasks: int = 4
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.taskDone.connect(self.queueDone) self._running_tasks = scollections.IdSet() self._waiting_tasks = [] self._tasks = [] self._stop_on_empty_queue = True self._done_adding = False
[docs] def addTask(self, task): self._tasks.append(task) self._waiting_tasks.append(task) if self.isRunning(): self._startMoreTasks()
[docs] def getTasks(self): """ Return all tasks in the queue. :rtype: tuple[tasks.AbstractTask] """ return tuple(self._tasks)
[docs] def setUpMain(self): self._running_tasks = scollections.IdSet() self._waiting_tasks = list(self._tasks) self._startMoreTasks()
@tasks.SignalTask.guard_method def _onTaskFinished(self): task = self.sender() self._running_tasks.remove(task) self.queuedTaskFinished.emit(task) self._startMoreTasks() @tasks.SignalTask.guard_method def _startMoreTasks(self): if not self._running_tasks and not self._waiting_tasks: if self._stop_on_empty_queue: self.mainDone.emit() while len(self._running_tasks) < self.max_running_tasks: try: task = self._waiting_tasks.pop(0) except IndexError: break task.taskDone.connect(self._onTaskFinished) task.taskFailed.connect(self._onTaskFinished) self._running_tasks.add(task) QtCore.QTimer.singleShot(0, task.start)
#=============================================================================== # TaskDJ #=============================================================================== class _TaskJob(queue.BaseJob): TASK_JOB_STATUS_MAP = { tasks.Status.WAITING: queue.JobState.WAITING, tasks.Status.RUNNING: queue.JobState.ACTIVE, tasks.Status.DONE: queue.JobState.DONE, tasks.Status.FAILED: queue.JobState.FAILED_RETRYABLE } def __init__(self, task, timeout=None): super(_TaskJob, self).__init__() self._task = task self.name = task.name self._timeout = timeout if jobtasks.is_jobtask(task): # Setting host ensures that TaskDJ doesn't use more than # requested number of CPUs. This fixes PPW2 issue PPREP-1674 self.host = task.job_config.host_settings.host.name def doCommand(self): self._task.start() self._start_time = time.time() self._killed = False def update(self): self.state = self.TASK_JOB_STATUS_MAP[self._task.status] if self._timeout and not self._killed: if (time.time() - self._start_time) > self._timeout: print(f'TaskDJ killing {self._task.name} for running too long') try: self._task.kill() except: print("WARNING: something went wrong while killing task.") self._killed = True def retryFailure(self, max_retries=0): """ Called by JobDJ to determine whether we should retry this failed task. :param max_retries: the max number of retries allowed by the TaskDJ :return: whether to retry """ return self.num_failures <= max_retries
[docs]class TaskDJ(queue.JobDJ): """ WARNING: This class is slated for removal. To run tasks in parallel, consider using the `run_tasks_in_parallel` below, or else use JobDJ directly by using task.runToCmd and task.setJob A subclass of JobDJ that supports running tasks. """
[docs] def __init__(self, *args, task_timeout=None, **kwargs): super().__init__(*args, **kwargs) self._updated_jobs = [] self._event_loop = QtCore.QEventLoop() self._job_dj_complete = False self._completed_tasks = scollections.IdSet() self._task_timeout = task_timeout
[docs] def addTask(self, task): task_job = _TaskJob(task, timeout=self._task_timeout) self.addJob(task_job)
def _startJobDJ(self): @qt_utils.exit_event_loop_on_exception def callback(job, *, event_loop=None): self._updated_jobs.append(job) event_loop.quit() callback = functools.partial(callback, event_loop=self._event_loop) @qt_utils.exit_event_loop_on_exception def run_jobdj(*, event_loop=None): try: self.run(status_change_callback=callback) finally: self._job_dj_complete = True event_loop.quit() run_jobdj = functools.partial(run_jobdj, event_loop=self._event_loop) self._job_dj_complete = False QtCore.QTimer.singleShot(0, run_jobdj)
[docs] def updatedTasks(self): self._startJobDJ() while not self._job_dj_complete: yield from self._processUpdatedTasks() self._event_loop.exec() yield from self._processUpdatedTasks() exc = qt_utils.get_last_exception() if exc: raise exc
def _processUpdatedTasks(self): """ Yield the tasks from the jobs in `_updated_jobs`. Each task is yielded once when it starts and once when it completes. Calling this method empites out `_updated_jobs`. """ for _ in range(len(self._updated_jobs)): job = self._updated_jobs.pop() if job._task.status is job._task.DONE: # Only yield a task if hasn't already been yielded as DONE before if job._task in self._completed_tasks: continue self._completed_tasks.add(job._task) yield job._task
#=============================================================================== # Utility functions #===============================================================================
[docs]class AutoFileMode(enum.IntEnum): FAILED_LOG = enum.auto() # Only log file from failed tasks FAILED_ALL = enum.auto() # The entire taskdir for failed tasks NONE = enum.auto() # No files LOG = enum.auto() # Only the log file from all tasks ALL = enum.auto() # The entire taskdir for all tasks
def _is_subprocesstask(t): return isinstance(t, tasks.SubprocessMixin)
[docs]def run_tasks_in_parallel(task_list, autoname=True, basename=None): """ This functions provides a convenient way of launching multiple tasks in parallel while taking care of boilerplate and prevents common mistakes. By default, this function will: - Give each task a unique name - Give each task its own directory - Create a new JobDJ or TaskQueue - Run it with the provided tasks - Register the log files of any failed tasks with jobserver if using jobtasks This can be used with either JobTasks or SubprocessTasks. SubprocessTasks will be run on a TaskQueue while JobTasks will be run on JobDJ. :param task_list: the tasks to run :type task_list: list of tasks.AbstractTask :param autoname: whether to automatically give each task a unique name :type autoname: bool :param basename: basename to be used in autonaming each task. Has no effect if autoname is set to False :type basename: str :return: the list of tasks that completed successfully. This return value is useful for downstream processing, so the caller doesn't need to filter out failed tasks :rtype: list of tasks.AbstractTask """ if task_list == []: return task_list elif all(jobtasks.is_jobtask(t) for t in task_list): return run_tasks_on_dj(task_list, autoname=autoname, basename=basename) elif all(_is_subprocesstask(t) for t in task_list): return run_tasks_on_queue(task_list, autoname=autoname, basename=basename) else: raise ValueError("task_list must be a homogeneous list of either " "subprocess tasks or job tasks.")
[docs]def run_tasks_on_queue(task_list, autoname=True, basename=None, max_running_tasks=None): """ Launch multiple SubprocessTasks on a TaskQueue. See `run_tasks_in_parallel` for more details. :param max_running_tasks: specifies how many tasks can be run simultaneously :type max_running_tasks: int """ q = TaskQueue() if max_running_tasks: q.max_running_tasks = max_running_tasks if autoname: autoname_tasks(task_list, basename=basename) for t in task_list: q.addTask(t) q.start() q.wait() return [t for t in task_list if t.status is t.DONE]
[docs]def run_tasks_on_dj(task_list, dj=None, autoname=True, basename=None, auto_file_mode=AutoFileMode.FAILED_LOG): """ Launch multiple jobtasks on a jobdj. Use this function over `run_tasks_in_parallel` if you know you'll be running jobtasks and you want to further customize how they're run. See `run_tasks_in_parallel` for details on more params. :param dj: A JobDJ to run the tasks on. This is optional and useful if want a jobdj with specific settings. :type dj: schrodinger.job.queue.JobDJ :param auto_file_mode: what subtask files to register with the parent job to be copied back by jobcontrol. Only has an effect if this function is called from inside a jobcontrol backend. :type auto_file_mode: AutoFileMode """ if task_list == []: return [] dj = setup_dj_tasks(task_list, dj=dj, autoname=autoname, basename=basename) dj.run() task_dict = {task.name: task for task in task_list} succeeded_tasks = [] for jobdj_job in dj.all_jobs: job = jobdj_job.getJob() task = task_dict[job.Name] task.setJob(job) if task.status is task.DONE: succeeded_tasks.append(task) auto_register_files(task_list, auto_file_mode=auto_file_mode) return succeeded_tasks
[docs]def setup_dj_tasks(task_list, dj=None, autoname=True, basename=None): """ Sets up a TaskDJ with a collection of tasks. See run_tasks_on_dj for details """ if autoname: autoname_tasks(task_list, basename=basename) if dj is None: dj = queue.JobDJ(max_failures=NOLIMIT) for task in task_list: task.specifyTaskDir(tasks.AUTO_TASKDIR) # getTaskDir must be called after runToCmd try: cmd = task.runToCmd() except tasks.TaskFailure as e: if 'already exists' in str(e): raise RuntimeError(f"Task names must be unique. \"{task.name}\"" " used twice") dj.addJob(cmd, command_dir=task.getTaskDir()) return dj
[docs]def autoname_tasks(task_list, basename=None): """ Sets unique names on all the tasks provided with an incrementing integer suffix. See run_tasks_on_dj for details. """ if basename is None: basename = task_list[0].name for i, task in enumerate(task_list): task.name = f'{basename}_{i}'
[docs]def auto_register_files(task_list, auto_file_mode=AutoFileMode.FAILED_LOG): """ Automatically registers files from the subtask with the parent job (if applicable). """ backend = jobcontrol.get_backend() if backend is not None and auto_file_mode is not AutoFileMode.NONE: for task in task_list: logfile = task.getTaskFilename(f'{task.name}.log') taskdir = task.getTaskDir() if not mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER): # Legacy jobcontrol won't take absolute paths (JOBCON-7659) taskdir = os.path.relpath(taskdir) logfile = os.path.relpath(logfile) if auto_file_mode is AutoFileMode.LOG: backend.addOutputFile(logfile) elif auto_file_mode is AutoFileMode.ALL: backend.addOutputFile(taskdir) if task.status is task.FAILED: if auto_file_mode is AutoFileMode.FAILED_LOG: backend.addOutputFile(logfile) if auto_file_mode is AutoFileMode.FAILED_ALL: backend.addOutputFile(taskdir)