Source code for schrodinger.job.jobhandler

import os
import random
import string
from datetime import datetime
from typing import Callable
from typing import List
from typing import Optional

import schrodinger
from schrodinger.infra import jobhub
from schrodinger.Qt import QtCore
from schrodinger.utils import mmutil

from . import jobcontrol
from .download import download_job

# Temporary, delete when PANEL-19692 is complete
DEBUG = False


class _AbstractJobHandler(QtCore.QObject):
    """
    Base class for job handlers.

    :ivar jobCompleted: Signal emitted when the job is completed and downloaded.
        Under JOB_SERVER, a job can be complete but not downloaded, but this
        signal will only be emitted when a job has finished downloading.
    """

    jobCompleted = QtCore.pyqtSignal(jobcontrol.Job)
    jobDownloadFailed = QtCore.pyqtSignal(jobcontrol.Job, str)
    jobProgressChanged = QtCore.pyqtSignal(jobcontrol.Job, int, int, str)

    def __init__(self,
                 cmd: List[str],
                 viewname: Optional[str] = None,
                 launch_dir: Optional[str] = None):
        super().__init__()

        if viewname is None:
            # If no viewname is provided, we just give the job a random string
            # for a viewname
            viewname = "".join(
                random.choices(string.ascii_uppercase + string.digits, k=32))
        self._launch_dir = launch_dir if launch_dir else os.getcwd()
        self.viewname = viewname
        self.job = None
        self._setupWaitLoop()
        self._setupJobCmd(cmd)
        for signal, slot in self._getJobManagerSignalsAndSlots():
            signal.connect(slot)
        self._completed = False

    def _setupJobCmd(self, cmd_list: List[str]):
        """
        Construct a job command adding a viewname and project to a job command
        if they are not already present.
        """
        if "-VIEWNAME" not in cmd_list:
            cmd_list += ["-VIEWNAME", self.viewname]
        # Calling schrodinger.get_maestro() directly to reduce the risk of
        # a unittest leaking a maestro mock
        maestro = schrodinger.get_maestro()
        if maestro and "-PROJ" not in cmd_list:
            pt = maestro.project_table_get()
            prj_name = pt.project_name
            cmd_list += ["-PROJ", prj_name]
        self._cmd_list = cmd_list

    def _setupWaitLoop(self):
        self._wait_loop = QtCore.QEventLoop()
        self.jobCompleted.connect(self._wait_loop.quit)

    def _getJobManagerSignalsAndSlots(self):
        jmgr = jobhub.get_job_manager()
        done_signal, done_slot = self._getDoneSignalAndSlot()
        return [
            (jmgr.jobProgressChanged, self._onJobProgressChanged),
            (done_signal, done_slot),
        ]

    def _getDoneSignalAndSlot(self):
        jmgr = jobhub.get_job_manager()
        if not mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
            # with JOB_SERVER off, the job is downloaded when it's completed so
            # no special handling is needed
            done_signal = jmgr.jobCompleted
            done_slot = self._onJobDone
        elif is_auto_download_active():
            # jobDownloaded is emitted when maestro downloads the job
            done_signal = jmgr.jobDownloaded
            done_slot = self._onJobDone
        else:
            done_signal = jmgr.jobCompleted
            # Without auto-download, need special handling for completed job
            done_slot = self._onJobFinishedRunning
        return done_signal, done_slot

    # ==========================================================================
    # _AbstractJobHandler API
    # ==========================================================================

    def launchJob(self, *, _debug_delay=None) -> jobcontrol.Job:
        """
        Launch the job.
        """

        if self.job is not None:
            raise RuntimeError("Job has already been launched")
        return self._launchJob(_debug_delay=_debug_delay)

    def _launchJob(self, *, _debug_delay=None):
        raise NotImplementedError

    def wait(self):
        if self.job is None:
            raise RuntimeError("Can't wait until the job has been launched")
        if self.job.isComplete():
            return
        self._wait_loop.exec()

    # ==========================================================================
    # _AbstractJobHandler slots
    # ==========================================================================

    def _onJobFinishedRunning(self, job: jobcontrol.Job):
        """
        Download the job if it succeeded. Only called outside of maestro.

        Called when job finishes running. Under JOB_SERVER, a job completes
        when the (remote) calculations finish but before the results have been
        downloaded.
        """
        if self.job is None or self._completed:
            return
        if job.JobId == self.job.JobId:
            try:
                download_error = download_job(job.JobId)
            except Exception as exc:
                self.jobDownloadFailed.emit(job, str(exc))
            else:
                if download_error:
                    self.jobDownloadFailed.emit(job, download_error)
            finally:
                self._onJobDone(job)

    def _onJobDone(self, job: jobcontrol.Job):
        """
        Called after job is both finished running and downloaded
        """
        if DEBUG:
            print(f"{datetime.now()} _onJobDone called")
        if self.job is None or self._completed:
            return
        if job.JobId == self.job.JobId:
            self.job = job
            self._completeJob()

    def _onJobProgressChanged(self, job: jobcontrol.Job, current_step: int,
                              total_steps: int, progress_msg: str):
        if self._completed:
            return
        if (not self.job or job.JobId != self.job.job_id):
            return
        self.job = job
        self.jobProgressChanged.emit(self.job, current_step, total_steps,
                                     progress_msg)

    # ==========================================================================
    # _AbstractJobHandler implementation methods
    # ==========================================================================

    def _completeJob(self):
        """
        Mark the current job as completed (i.e. downloaded).
        """
        if self._completed:
            return
        self._completed = True
        self.jobCompleted.emit(self.job)
        for signal, slot in self._getJobManagerSignalsAndSlots():
            signal.disconnect(slot)

    def _checkForEarlyCompletion(self):
        # Preemptive job ID
        try:
            job = jobhub.get_cached_job(self.job.job_id)
        except jobhub.StdException:
            # The job manager did not recognize the job ID. This means the
            # `jobCompleted` signal was not emitted.
            return
        if job.isComplete():
            _, done_slot = self._getDoneSignalAndSlot()
            done_slot(job)

    def __del__(self):
        if hasattr(self, "_wait_loop"):
            self._wait_loop.quit()


[docs]class JobHandler(_AbstractJobHandler): """ A Job Handler for running and waiting on jobs. To use, initialize with a list of strings that you would use with JobViewFilter.launchJob. Then connect `my_jobhandler.jobCompleted` to any slots that need to be executed after the job is finished. The job handler also has a wait method that will pause execution of the current event until the job is finished. Note that during the wait, other ui events will continue to be processed. """ # ========================================================================== # JobHandler API # ========================================================================== def _launchJob(self, *, _debug_delay=None) -> jobcontrol.Job: """ Launch the job. An event loop is executed while job is being launched. :return Job object for the started job. :rtype: jobcontrol.Job :raises JobLaunchFailure: if the job failed to start. NOTE: unlike jobcontrol.launch_job(), no dialog is shown on failue, so calling code is responsible by informing the user of the failure. """ self.job = jobcontrol.launch_job(self._cmd_list, launch_dir=self._launch_dir, print_output=True, _debug_delay=_debug_delay) self._checkForEarlyCompletion() return self.job
[docs]class AsyncJobHandler(_AbstractJobHandler): """ A jobhandler that launches jobs asynchronously (i.e. launchJob doesn't wait for the job to actually start before returning). """ jobStarted = QtCore.pyqtSignal(jobcontrol.Job) jobLaunchFailed = QtCore.pyqtSignal(Exception)
[docs] def __init__(self, *args, **kwargs): """ See _AbstractJobHandler for arguments. """ super().__init__(*args, **kwargs) self.err_message = None
# ========================================================================== # AsyncJobHandler slots # ========================================================================== def _onJobStarted(self, launched_job: jobcontrol.Job): job = launched_job if not job: raise jobcontrol.JobLaunchFailure( 'Launch failed (event loop killed)') self.job = job self.err_message = None self._checkForEarlyCompletion() self.jobStarted.emit(job) def _onJobLaunchFailed(self, launch_err): err_message = launch_err if err_message: self.jobLaunchFailed.emit(jobcontrol.JobLaunchFailure(err_message)) self.err_message = err_message # ========================================================================== # AsyncJobHandler implementation methods # ========================================================================== def _launchJob(self, *, _debug_delay=None): """ Launch a job asynchronously. Returns before command is started. """ viewname = "" # this parameter is unused MAE-45178 job_launcher = jobhub.JobLauncher( jobhub.JobCommand(self._cmd_list, viewname, self._launch_dir)) if _debug_delay is None: job_launcher.jobStarted.connect(self._onJobStarted) else: def slot(job): QtCore.QTimer.singleShot(_debug_delay * 1000, lambda: self._onJobStarted(job)) job_launcher.jobStarted.connect(slot) job_launcher.jobLaunchFailed.connect(self._onJobLaunchFailed) self.job_launcher = job_launcher job_launcher.launch() # Confirm that neither signal was emitted yet assert self.job is None assert self.err_message is None
[docs]def job_incorporated(job_id: str, first_entry_id: int, last_entry_id: int): """ The function which is called after successful incorporation of the job from maestro. It is called only if job output is incorporated through maestro job incorporation. If individual panels have their own incorporation handler registered via maestro.job_incorporation_function_add(), and the panel is currently open and has handled the incorporation, this function will not be called by Maestro. :param job_id: The id of the incorporated job :param first_entry_id: The id of the first entry imported in the project from the output structure file associated with the given job. :param last_entry_id: The id of the last entry imported in the project from the output structure file associated with the given job. """ # Preemptive job ID try: job = jobhub.get_cached_job(job_id) except jobhub.StdException: # Job record is missing return viewname = job.Viewname if not viewname: return from schrodinger.maestro import maestro_job_callbacks func = maestro_job_callbacks.maestro_job_incorporated_callbacks.get( viewname) if func is not None: func(job_id, first_entry_id, last_entry_id)
[docs]def connect_job_manager_signals(): """ Called by maestro to connect job manager signals to Python slots. """ if not schrodinger.get_maestro(): return # Set up custom job completion handlers # Only active with feature flags JOB_SERVER and NEW_INCORPORATION_INFRA def handle_job_completion(jobdata): handler_id = jobdata.getCustomHandlerId() if not handler_id: return from schrodinger.maestro import maestro_job_callbacks func = maestro_job_callbacks.custom_job_completion_handlers.get( handler_id) if func is None: # TODO PANEL-19836: determine how we want to report this problem return else: func(jobdata) jmgr = jobhub.get_job_manager() jmgr.readyForPythonIncorporation.connect(handle_job_completion)
# =============================================================================== # Convenience functions # =============================================================================== _active_handlers = []
[docs]def launch_job_with_callback( cmd: List[str], callback: Callable[[jobcontrol.Job], None], launch_dir: Optional[str] = None) -> jobcontrol.Job: """ Launch the given job, and call the specified callback when the job completes (either successfully or with a failure). :param cmd: Command list :param callback: Function to call when the job completes (either successfully or with a failure), with Job object as the only parameter. :param launch_dir: Directory to launch job under :raises RuntimeError: if the job fails to start. """ jhandler = JobHandler(cmd, launch_dir=launch_dir) _active_handlers.append(jhandler) def job_completed(job: jobcontrol.Job): if jhandler in _active_handlers: _active_handlers.remove(jhandler) callback(jhandler.job) jhandler.jobCompleted.connect(job_completed) jhandler.launchJob() return jhandler.job
[docs]def is_auto_download_active(): """ :return: Whether job auto-downloading is enabled """ return bool(schrodinger.get_maestro() and mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER))