Source code for schrodinger.application.matsci.qb_sdk.job

# Copyright (c) 2021, Qu & Co
# All rights reserved.

# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:

# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.

# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.

# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

# built-in
import pprint
import time
from typing import Optional

# local
from .client import QubecClient, get_client
from .parameters import QUBEC_SDK_TERMINATE_ON_FAIL, SEPARATOR, QubecSdkError
from .tools import get_logger

DEFAULT_JOB_TIMEOUT = 86400
JOB_POLLING_TIME = 30

logger = get_logger(__name__)


[docs]class QubecJob: """ Class representing a running or completed job on the QUBEC platform Attributes: job_id (str): The unique ID assigned by QUBEC to the job experiment (str): A string representing the desired quantum algorithm to run backend_type (str): A string representing the type of quantum backend to use backend (dict): A dictionary with relevant backend information problem (dict): dictionary with problem definition parameters (dict): Any additional parameters which has been sent to the QUBEC job submission engine status (str): The status of the job result (dict): A dictionary with partial or full results of the job error (str): A string with an error message if any problem occurred during job submission """
[docs] def __init__( self, job_id: str, experiment: str = None, backend_type: str = None, backend: dict = None, problem: dict = None, **parameters, ) -> None: """ Initialize an instance of the QubecJob class representing a running or completed job on the QUBEC platform Args: job_id (str): The unique ID assigned by QUBEC to the job experiment (str): A string representing the desired quantum algorithm to run backend_type (str): A string representing the type of quantum backend to use backend (dict): A dictionary with relevant backend information problem (dict): dictionary with problem definition **parameters (dict): Any additional parameters which has been sent to the QUBEC job submission engine """ self.job_id: str = job_id self.experiment: str = experiment self.backend_type: str = backend_type self.backend: dict = backend if backend is not None else {} self.problem: dict = problem if problem is not None else {} self.parameters: dict = parameters self.status: str = "initialized" self.result: dict = {} self.error: Optional[str] = None
[docs] def wait_completion(self, timeout=DEFAULT_JOB_TIMEOUT) -> None: """ Wait until the job has completed by periodically querying the results in the backend. If the session has expired, the token will be refreshed Args: timeout (int): A timeout after which stopping to listen for job progresses Raises: QubecSdkError if any error occured while querying for QUBEC job progress in the backend """ client: QubecClient = get_client() elapsed = 0 completed = False logger.info(SEPARATOR) while not completed: if elapsed > timeout: logger.warning( f"Job {self.job_id} has timed out without completing.") content = client.progress_job(self.job_id) result = content["data"]["result"] if "result" in content[ "data"] else {} status = content["data"]["status"] if "status" in content[ "data"] else "unknown" raise QubecSdkError( "The job has reached the timeout without terminating. " + f"Final status: {status}. Partial results: {result}") client.refresh_session() try: content = client.progress_job(self.job_id) self.status = content["data"]["status"] logger.info( f"Checking progress of job {self.job_id}. Current status: {self.status}" ) # detailed job information details = content["data"].get("details", "No information available") logger.info( f"Job progress information\n{pprint.pformat(details, indent=4, width=50)}\n" ) # termination statuses if self.status == "completed": time.sleep(1) self.result = content["data"].get("result", {}) completed = True elif self.status == "failed": time.sleep(1) self.error = content["data"].get("error_msg", "") msg = f"Job {self.job_id} has failed.\nDetailed error message: {self.error}" logger.warning(msg) if QUBEC_SDK_TERMINATE_ON_FAIL: raise QubecSdkError(msg) completed = True elif self.status == "cancelled": time.sleep(1) msg = f"The job {self.job_id} has been cancelled by the user." logger.warning(msg) if QUBEC_SDK_TERMINATE_ON_FAIL: raise QubecSdkError(msg) completed = True logger.info(SEPARATOR) except QubecSdkError as e: self.error = str(e) break time.sleep(JOB_POLLING_TIME) elapsed += JOB_POLLING_TIME
[docs] def cancel(self) -> dict: """ Send a remote cancellation request of the job Returns: A dictionary with the cancellation API response Raises: QubecSdkError if the cancellation request failed """ client: QubecClient = get_client() logger.debug(SEPARATOR) try: content = client.cancel_job(self.job_id) logger.info(f"Successfully cancelled job {self.job_id}") self.status = "cancelled" return content except QubecSdkError as e: logger.warning(str(e)) return {}
[docs] def get_result(self, use_cached: bool = False) -> dict: """ Get the available results of the current job Args: use_cached (bool): If true, do not perform any request and used cached result values Returns: A dictionary with the job results """ if use_cached: return self.result else: try: client: QubecClient = get_client() client.refresh_session() content = client.progress_job(self.job_id) self.result = content["data"]["result"] self.status = content["data"]["status"] return self.result except (KeyError, QubecSdkError): logger.warning(f"Cannot retrieve results of job {self.job_id}") return {}
[docs] def get_error(self, use_cached: bool = False) -> Optional[str]: if use_cached: return self.error else: try: client: QubecClient = get_client() client.refresh_session() content = client.progress_job(self.job_id) self.error = (content["data"]["error_msg"] if "error_msg" in content["data"] else None) return self.error except (KeyError, QubecSdkError): logger.warning(f"Cannot retrieve error for job {self.job_id}") return None
[docs] def get_status(self, use_cached: bool = False) -> Optional[str]: """ Get the current status of the job Args: use_cached (bool): If true, do not perform any request and used cached result values Returns: A string with the job status """ client: QubecClient = get_client() client.refresh_session() if use_cached: return self.status else: try: content = client.progress_job(self.job_id) self.status = content["data"]["status"] return self.status except QubecSdkError: logger.warning(f"Cannot retrieve results of job {self.job_id}") return None
def __str__(self): msg = f""" {SEPARATOR} Final job status: {self.status} Results """ msg += pprint.pformat(self.get_result()) return msg # TODO
[docs] @staticmethod def from_id(job_id: str) -> "QubecJob": pass