Source code for qiskit_serverless.core.job

# This code is a Qiskit project.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.


"""
=============================================
Provider (:mod:`qiskit_serverless.core.job`)
=============================================

.. currentmodule:: qiskit_serverless.core.job

Qiskit Serverless job
======================

.. autosummary::
    :toctree: ../stubs/

    RuntimeEnv
    Job
"""
# pylint: disable=duplicate-code
from abc import ABC, abstractmethod
import json
import logging
import os
import time
import warnings
from typing import ClassVar, Dict, Any, Literal, Optional, Union
from dataclasses import dataclass

import ray.runtime_env
import requests

from qiskit_ibm_runtime import QiskitRuntimeService

from qiskit_serverless.core.constants import (
    ENV_JOB_GATEWAY_INSTANCE,
    REQUESTS_TIMEOUT,
    ENV_JOB_GATEWAY_TOKEN,
    ENV_JOB_GATEWAY_HOST,
    ENV_JOB_ID_GATEWAY,
    ENV_GATEWAY_PROVIDER_VERSION,
    GATEWAY_PROVIDER_VERSION_DEFAULT,
    ENV_ACCESS_TRIAL,
)
from qiskit_serverless.exception import QiskitServerlessException
from qiskit_serverless.serializers.program_serializers import (
    QiskitObjectsEncoder,
    QiskitObjectsDecoder,
)
from qiskit_serverless.utils.http import get_headers
from qiskit_serverless.utils.json import is_jsonable

RuntimeEnv = ray.runtime_env.RuntimeEnv


@dataclass
class Configuration:  # pylint: disable=too-many-instance-attributes
    """Program Configuration.

    Args:
        workers: number of worker pod when auto scaling is NOT enabled
        auto_scaling: set True to enable auto scating of the workers
        min_workers: minimum number of workers when auto scaling is enabled
        max_workers: maxmum number of workers when auto scaling is enabled
    """

    workers: Optional[int] = None
    min_workers: Optional[int] = None
    max_workers: Optional[int] = None
    auto_scaling: Optional[bool] = False


class JobService(ABC):
    """Provide access to job information"""

    @abstractmethod
    def status(self, job_id: str) -> str:
        """Check status."""

    @abstractmethod
    def stop(
        self, job_id: str, service: Optional[QiskitRuntimeService] = None
    ) -> Union[str, bool]:
        """Stops job/program."""

    @abstractmethod
    def result(self, job_id: str) -> Any:
        """Return results."""

    @abstractmethod
    def logs(self, job_id: str) -> str:
        """Return logs."""

    @abstractmethod
    def filtered_logs(self, job_id: str, **kwargs) -> str:
        """Returns logs of the job.
        Args:
            job_id: The job's logs
            include: rex expression finds match in the log line to be included
            exclude: rex expression finds match in the log line to be excluded
        """


PendingType = Literal["PENDING"]
RunningType = Literal["RUNNING"]
StoppedType = Literal["STOPPED"]
SucceededType = Literal["SUCCEEDED"]
FailedType = Literal["FAILED"]
QueuedType = Literal["QUEUED"]
# RUNNING statuses
MappingType = Literal["MAPPING"]
OptimizingHardwareType = Literal["OPTIMIZING_HARDWARE"]
WaitingQpuType = Literal["WAITING_QPU"]
ExecutingQpuType = Literal["EXECUTING_QPU"]
PostProcessingType = Literal["POST_PROCESSING"]


[docs]class Job: """Job.""" PENDING: ClassVar[PendingType] = "PENDING" RUNNING: ClassVar[RunningType] = "RUNNING" STOPPED: ClassVar[StoppedType] = "STOPPED" SUCCEEDED: ClassVar[SucceededType] = "SUCCEEDED" FAILED: ClassVar[FailedType] = "FAILED" QUEUED: ClassVar[QueuedType] = "QUEUED" # RUNNING statuses MAPPING: ClassVar[MappingType] = "MAPPING" OPTIMIZING_HARDWARE: ClassVar[OptimizingHardwareType] = "OPTIMIZING_HARDWARE" WAITING_QPU: ClassVar[WaitingQpuType] = "WAITING_QPU" EXECUTING_QPU: ClassVar[ExecutingQpuType] = "EXECUTING_QPU" POST_PROCESSING: ClassVar[PostProcessingType] = "POST_PROCESSING"
[docs] def __init__( self, job_id: str, job_service: JobService, raw_data: Optional[Dict[str, Any]] = None, ): """Job class for async script execution. Args: job_id: if of the job client: client """ self.job_id = job_id self._job_service = job_service self.raw_data = raw_data or {}
def status(self): """Returns status of the job.""" return _map_status_to_serverless(self._job_service.status(self.job_id)) def stop(self, service: Optional[QiskitRuntimeService] = None): """Stops the job from running.""" warnings.warn( "`stop` method has been deprecated. " "And will be removed in future releases. " "Please, use `cancel` instead.", DeprecationWarning, ) return self.cancel(service) def cancel(self, service: Optional[QiskitRuntimeService] = None): """Cancels the job.""" return self._job_service.stop(self.job_id, service=service) def logs(self) -> str: """Returns logs of the job.""" return self._job_service.logs(self.job_id) def filtered_logs(self, **kwargs) -> str: """Returns logs of the job. Args: include: rex expression finds match in the log line to be included exclude: rex expression finds match in the log line to be excluded """ return self._job_service.filtered_logs(job_id=self.job_id, **kwargs) def error_message(self): """Returns the execution error message.""" return self._job_service.result(self.job_id) if self.status() == "ERROR" else "" def result(self, wait=True, cadence=30, verbose=False, maxwait=0): """Return results of the job. Args: wait: flag denoting whether to wait for the job result to be populated before returning cadence: time to wait between checking if job has been terminated verbose: flag denoting whether to log a heartbeat while waiting for job result to populate """ if wait: if verbose: logging.info("Waiting for job result.") count = 0 while not self.in_terminal_state() and (maxwait == 0 or count < maxwait): count += 1 time.sleep(cadence) if verbose: logging.info(count) # Retrieve the results. If they're string format, try to decode to a dictionary. results = self._job_service.result(self.job_id) if self.status() == "ERROR": raise QiskitServerlessException(results) if isinstance(results, str): try: results = json.loads(results, cls=QiskitObjectsDecoder) except json.JSONDecodeError: logging.warning("Error during results decoding.") return results def in_terminal_state(self) -> bool: """Checks if job is in terminal state""" terminal_status = ["CANCELED", "DONE", "ERROR"] return self.status() in terminal_status def __repr__(self): return f"<Job | {self.job_id}>"
[docs]def save_result(result: Dict[str, Any]): """Saves job results. Note: data passed to save_result function must be json serializable (use dictionaries). Default serializer is compatible with IBM QiskitRuntime provider serializer. List of supported types [ndarray, QuantumCircuit, Parameter, ParameterExpression, NoiseModel, Instruction]. See full list via link. Links: Source of serializer: https://github.com/Qiskit/qiskit-ibm-runtime/blob/0.14.0/qiskit_ibm_runtime/utils/json.py#L197 Example: >>> # save dictionary >>> save_result({"key": "value"}) >>> # save circuit >>> circuit: QuantumCircuit = ... >>> save_result({"circuit": circuit}) >>> # save primitives data >>> quasi_dists = Sampler.run(circuit).result().quasi_dists >>> # {"1x0": 0.1, ...} >>> save_result(quasi_dists) Args: result: data that will be accessible from job handler `.result()` method. """ version = os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) if version is None: version = GATEWAY_PROVIDER_VERSION_DEFAULT token = os.environ.get(ENV_JOB_GATEWAY_TOKEN) if token is None: logging.warning( "Results will be saved as logs since " "there is no information about the " "authorization token in the environment." ) logging.info("Result: %s", result) result_record = json.dumps(result or {}, cls=QiskitObjectsEncoder) print(f"\nSaved Result:{result_record}:End Saved Result\n") return False instance = os.environ.get(ENV_JOB_GATEWAY_INSTANCE, None) if not is_jsonable(result, cls=QiskitObjectsEncoder): logging.warning("Object passed is not json serializable.") return False url = ( f"{os.environ.get(ENV_JOB_GATEWAY_HOST)}/" f"api/{version}/jobs/{os.environ.get(ENV_JOB_ID_GATEWAY)}/result/" ) response = requests.post( url, data={"result": json.dumps(result or {}, cls=QiskitObjectsEncoder)}, headers=get_headers(token=token, instance=instance), timeout=REQUESTS_TIMEOUT, ) if not response.ok: sanitized = response.text.replace("\n", "").replace("\r", "") logging.warning("Something went wrong: %s", sanitized) return response.ok
def update_status(status: str): """Update sub status.""" version = os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) if version is None: version = GATEWAY_PROVIDER_VERSION_DEFAULT token = os.environ.get(ENV_JOB_GATEWAY_TOKEN) if token is None: logging.warning( "'sub_status' cannot be updated since " "there is no information about the " "authorization token in the environment." ) return False instance = os.environ.get(ENV_JOB_GATEWAY_INSTANCE, None) url = ( f"{os.environ.get(ENV_JOB_GATEWAY_HOST)}/" f"api/{version}/jobs/{os.environ.get(ENV_JOB_ID_GATEWAY)}/sub_status/" ) response = requests.patch( url, data={"sub_status": status}, headers=get_headers(token=token, instance=instance), timeout=REQUESTS_TIMEOUT, ) if not response.ok: sanitized = response.text.replace("\n", "").replace("\r", "") logging.warning("Something went wrong: %s", sanitized) return response.ok def _map_status_to_serverless(status: str) -> str: """Map a status string from job client to the Qiskit terminology.""" status_map = { Job.PENDING: "INITIALIZING", Job.RUNNING: "RUNNING", Job.STOPPED: "CANCELED", Job.SUCCEEDED: "DONE", Job.FAILED: "ERROR", Job.QUEUED: "QUEUED", Job.MAPPING: "RUNNING: MAPPING", Job.OPTIMIZING_HARDWARE: "RUNNING: OPTIMIZING_FOR_HARDWARE", Job.WAITING_QPU: "RUNNING: WAITING_FOR_QPU", Job.EXECUTING_QPU: "RUNNING: EXECUTING_QPU", Job.POST_PROCESSING: "RUNNING: POST_PROCESSING", } try: return status_map[status] except KeyError: return status
[docs]def is_running_in_serverless() -> bool: """Return ``True`` if running as a Qiskit serverless program, ``False`` otherwise.""" return ENV_JOB_ID_GATEWAY in os.environ
def is_trial() -> bool: """Return ``True`` if Job is running in trial mode, ``False`` otherwise.""" return os.getenv(ENV_ACCESS_TRIAL) == "True"