Source code for qiskit_serverless.core.job

# This code is a Qiskit project.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.


"""
=============================================
Provider (:mod:`qiskit_serverless.core.job`)
=============================================

.. currentmodule:: qiskit_serverless.core.job

Qiskit Serverless job
======================

.. autosummary::
    :toctree: ../stubs/

    RuntimeEnv
    Job
"""
# pylint: disable=duplicate-code
from abc import ABC, abstractmethod
import json
import logging
import os
import time
import warnings
from typing import Dict, Any, Optional, Union
from dataclasses import dataclass

import ray.runtime_env
import requests

from qiskit_ibm_runtime import QiskitRuntimeService

from qiskit_serverless.core.constants import (
    REQUESTS_TIMEOUT,
    ENV_JOB_GATEWAY_TOKEN,
    ENV_JOB_GATEWAY_HOST,
    ENV_JOB_ID_GATEWAY,
    ENV_GATEWAY_PROVIDER_VERSION,
    GATEWAY_PROVIDER_VERSION_DEFAULT,
)

from qiskit_serverless.exception import QiskitServerlessException
from qiskit_serverless.serializers.program_serializers import (
    QiskitObjectsEncoder,
    QiskitObjectsDecoder,
)
from qiskit_serverless.utils.json import is_jsonable

RuntimeEnv = ray.runtime_env.RuntimeEnv


@dataclass
class Configuration:  # pylint: disable=too-many-instance-attributes
    """Program Configuration.

    Args:
        workers: number of worker pod when auto scaling is NOT enabled
        auto_scaling: set True to enable auto scating of the workers
        min_workers: minimum number of workers when auto scaling is enabled
        max_workers: maxmum number of workers when auto scaling is enabled
    """

    workers: Optional[int] = None
    min_workers: Optional[int] = None
    max_workers: Optional[int] = None
    auto_scaling: Optional[bool] = False


class JobService(ABC):
    """Provide access to job information"""

    @abstractmethod
    def status(self, job_id: str) -> str:
        """Check status."""

    @abstractmethod
    def stop(
        self, job_id: str, service: Optional[QiskitRuntimeService] = None
    ) -> Union[str, bool]:
        """Stops job/program."""

    @abstractmethod
    def result(self, job_id: str) -> Any:
        """Return results."""

    @abstractmethod
    def logs(self, job_id: str) -> str:
        """Return logs."""

    @abstractmethod
    def filtered_logs(self, job_id: str, **kwargs) -> str:
        """Returns logs of the job.
        Args:
            job_id: The job's logs
            include: rex expression finds match in the log line to be included
            exclude: rex expression finds match in the log line to be excluded
        """


[docs]class Job: """Job."""
[docs] def __init__( self, job_id: str, job_service: JobService, raw_data: Optional[Dict[str, Any]] = None, ): """Job class for async script execution. Args: job_id: if of the job client: client """ self.job_id = job_id self._job_service = job_service self.raw_data = raw_data or {}
def status(self): """Returns status of the job.""" return _map_status_to_serverless(self._job_service.status(self.job_id)) def stop(self, service: Optional[QiskitRuntimeService] = None): """Stops the job from running.""" warnings.warn( "`stop` method has been deprecated. " "And will be removed in future releases. " "Please, use `cancel` instead.", DeprecationWarning, ) return self.cancel(service) def cancel(self, service: Optional[QiskitRuntimeService] = None): """Cancels the job.""" return self._job_service.stop(self.job_id, service=service) def logs(self) -> str: """Returns logs of the job.""" return self._job_service.logs(self.job_id) def filtered_logs(self, **kwargs) -> str: """Returns logs of the job. Args: include: rex expression finds match in the log line to be included exclude: rex expression finds match in the log line to be excluded """ return self._job_service.filtered_logs(job_id=self.job_id, **kwargs) def error_message(self): """Returns the execution error message.""" return self._job_service.result(self.job_id) if self.status() == "ERROR" else "" def result(self, wait=True, cadence=5, verbose=False, maxwait=0): """Return results of the job. Args: wait: flag denoting whether to wait for the job result to be populated before returning cadence: time to wait between checking if job has been terminated verbose: flag denoting whether to log a heartbeat while waiting for job result to populate """ if wait: if verbose: logging.info("Waiting for job result.") count = 0 while not self.in_terminal_state() and (maxwait == 0 or count < maxwait): count += 1 time.sleep(cadence) if verbose: logging.info(count) # Retrieve the results. If they're string format, try to decode to a dictionary. results = self._job_service.result(self.job_id) if self.status() == "ERROR": raise QiskitServerlessException(results) if isinstance(results, str): try: results = json.loads(results, cls=QiskitObjectsDecoder) except json.JSONDecodeError: logging.warning("Error during results decoding.") return results def in_terminal_state(self) -> bool: """Checks if job is in terminal state""" terminal_states = ["CANCELED", "DONE", "ERROR"] return self.status() in terminal_states def __repr__(self): return f"<Job | {self.job_id}>"
[docs]def save_result(result: Dict[str, Any]): """Saves job results. Note: data passed to save_result function must be json serializable (use dictionaries). Default serializer is compatible with IBM QiskitRuntime provider serializer. List of supported types [ndarray, QuantumCircuit, Parameter, ParameterExpression, NoiseModel, Instruction]. See full list via link. Links: Source of serializer: https://github.com/Qiskit/qiskit-ibm-runtime/blob/0.14.0/qiskit_ibm_runtime/utils/json.py#L197 Example: >>> # save dictionary >>> save_result({"key": "value"}) >>> # save circuit >>> circuit: QuantumCircuit = ... >>> save_result({"circuit": circuit}) >>> # save primitives data >>> quasi_dists = Sampler.run(circuit).result().quasi_dists >>> # {"1x0": 0.1, ...} >>> save_result(quasi_dists) Args: result: data that will be accessible from job handler `.result()` method. """ version = os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) if version is None: version = GATEWAY_PROVIDER_VERSION_DEFAULT token = os.environ.get(ENV_JOB_GATEWAY_TOKEN) if token is None: logging.warning( "Results will be saved as logs since" "there is no information about the" "authorization token in the environment." ) logging.info("Result: %s", result) result_record = json.dumps(result or {}, cls=QiskitObjectsEncoder) print(f"\nSaved Result:{result_record}:End Saved Result\n") return False if not is_jsonable(result, cls=QiskitObjectsEncoder): logging.warning("Object passed is not json serializable.") return False url = ( f"{os.environ.get(ENV_JOB_GATEWAY_HOST)}/" f"api/{version}/jobs/{os.environ.get(ENV_JOB_ID_GATEWAY)}/result/" ) response = requests.post( url, data={"result": json.dumps(result or {}, cls=QiskitObjectsEncoder)}, headers={"Authorization": f"Bearer {token}"}, timeout=REQUESTS_TIMEOUT, ) if not response.ok: sanitized = response.text.replace("\n", "").replace("\r", "") logging.warning("Something went wrong: %s", sanitized) return response.ok
def _map_status_to_serverless(status: str) -> str: """Map a status string from job client to the Qiskit terminology.""" status_map = { "PENDING": "INITIALIZING", "RUNNING": "RUNNING", "STOPPED": "CANCELED", "SUCCEEDED": "DONE", "FAILED": "ERROR", "QUEUED": "QUEUED", } try: return status_map[status] except KeyError: return status
[docs]def is_running_in_serverless() -> bool: """Return ``True`` if running as a Qiskit serverless program, ``False`` otherwise.""" return "ENV_JOB_ID_GATEWAY" in os.environ