Source code for qiskit_serverless.core.clients.serverless_client

# This code is a Qiskit project.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
================================================
Provider (:mod:`qiskit_serverless.core.client`)
================================================

.. currentmodule:: qiskit_serverless.core.client

Qiskit Serverless provider
===========================

.. autosummary::
    :toctree: ../stubs/

    ServerlessClient
"""

# pylint: disable=duplicate-code,too-many-lines
import json
import os.path
import os
import re
import tarfile
import warnings
from pathlib import Path
from urllib.parse import urlparse
from dataclasses import asdict
from typing import Optional, List, Dict, Any, Union
from collections.abc import Callable

import requests
from opentelemetry import trace
from qiskit.providers.backend import BackendV2 as Backend
from qiskit.providers.exceptions import QiskitBackendNotFoundError
from qiskit_ibm_runtime import QiskitRuntimeService, IBMBackend
from qiskit_ibm_runtime.accounts.exceptions import InvalidAccountError

from qiskit_serverless.core.constants import (
    REQUESTS_TIMEOUT,
    ENV_GATEWAY_PROVIDER_HOST,
    ENV_GATEWAY_PROVIDER_VERSION,
    ENV_GATEWAY_PROVIDER_TOKEN,
    GATEWAY_PROVIDER_VERSION_DEFAULT,
    IBM_SERVERLESS_HOST_URL,
    MAX_ARTIFACT_FILE_SIZE_MB,
    USAGE_LOW_THRESHOLD_SECONDS,
    USAGE_ZERO_EPSILON_SECONDS,
)
from qiskit_serverless.core.client import BaseClient
from qiskit_serverless.core.decorators import trace_decorator_factory
from qiskit_serverless.core.enums import Channel
from qiskit_serverless.core.files import GatewayFilesClient
from qiskit_serverless.core.job import (
    Job,
    Configuration,
    _map_status_to_serverless,
)
from qiskit_serverless.core.job_event import JobEvent

from qiskit_serverless.core.function import (
    QiskitFunction,
    RunService,
    RunnableQiskitFunction,
)

from qiskit_serverless.exception import QiskitServerlessException
from qiskit_serverless.utils.http import get_headers
from qiskit_serverless.utils.json import (
    safe_json_request_as_dict,
    safe_json_request_as_list,
    safe_json_request,
)
from qiskit_serverless.utils.formatting import format_provider_name_and_title
from qiskit_serverless.serializers.program_serializers import (
    QiskitObjectsEncoder,
    QiskitObjectsDecoder,
)

_trace_job = trace_decorator_factory("job")
_trace_functions = trace_decorator_factory("function")


[docs]class ServerlessClient(BaseClient): # pylint: disable=too-many-public-methods """ A client for connecting to a specified host. Example: >>> client = ServerlessClient( >>> name="<NAME>", >>> host="<HOST>", >>> token="<TOKEN>", >>> instance="<CRN>", >>> ) """
[docs] def __init__( # pylint: disable=too-many-positional-arguments self, name: Optional[str] = None, host: Optional[str] = None, version: Optional[str] = None, token: Optional[str] = None, instance: Optional[str] = None, channel: Optional[str] = None, ): """ Initializes the ServerlessClient instance. Args: name: name of client host: host of gateway. If None, it uses the ENV_GATEWAY_PROVIDER_HOST env var version: version of gateway token: authorization token instance: IBM Cloud CRN channel: identifies the method to use to authenticate the user """ name = name or "gateway-client" host = host or os.environ.get(ENV_GATEWAY_PROVIDER_HOST) if host is None: raise QiskitServerlessException("Please provide `host` of gateway.") version = version or os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) if version is None: version = GATEWAY_PROVIDER_VERSION_DEFAULT token = token or os.environ.get(ENV_GATEWAY_PROVIDER_TOKEN) if token is None: raise QiskitServerlessException("Authentication credentials must be provided in form of `token`.") channel = channel or Channel.IBM_QUANTUM_PLATFORM.value try: channel_enum = Channel(channel) except ValueError as error: raise ValueError( "Your channel value is not correct. Use one of the available channels: " f"{Channel.LOCAL.value}, " f"{Channel.IBM_CLOUD.value}, {Channel.IBM_QUANTUM_PLATFORM.value}" ) from error if channel_enum is Channel.IBM_CLOUD and instance is None: raise QiskitServerlessException("Authentication with IBM Cloud requires to pass the CRN as an instance.") if channel_enum is Channel.IBM_QUANTUM_PLATFORM and instance is None: raise QiskitServerlessException( "Authentication with IBM Quantum Platform requires to pass the CRN as an instance." ) super().__init__(name, host, token, instance, channel) self.version = version self._verify_credentials() self._files_client = GatewayFilesClient(self.host, self.token, self.version, self.instance, self.channel)
@classmethod def from_dict(cls, dictionary: dict): return ServerlessClient(**dictionary) def _verify_credentials(self): """Verify against the API that the credentials are correct.""" try: safe_json_request( request=lambda: requests.get( url=f"{self.host}/api/v1/programs/", headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) ) except QiskitServerlessException as reason: raise QiskitServerlessException("Credentials couldn't be verified.") from reason def dependencies_versions(self): """Get the list of available dependencies and its versions for creating functions""" return safe_json_request_as_list( request=lambda: requests.get( url=f"{self.host}/api/{self.version}/dependencies-versions/", headers=get_headers(token=self.token, instance=self.instance), timeout=REQUESTS_TIMEOUT, ) ) #################### ####### JOBS ####### #################### @_trace_job("list") def jobs(self, function: Optional[QiskitFunction] = None, **kwargs) -> List[Job]: """Retrieve a list of jobs with optional filtering. Args: function (QiskitFunction): The function that created the jobs we want to retrieve. limit (int, optional): Maximum number of jobs to return. Defaults to 10. offset (int, optional): Number of jobs to skip. Defaults to 0. status (str, optional): Filter by job status. created_after (str, optional): Filter jobs created after this timestamp. **kwargs: Additional query parameters. Returns: List[Job]: List of Job objects matching the criteria. """ limit = kwargs.get("limit", 10) kwargs["limit"] = limit offset = kwargs.get("offset", 0) kwargs["offset"] = offset status = kwargs.get("status", None) if status: status, _ = _map_status_to_serverless(status) kwargs["status"] = status created_after = kwargs.get("created_after", None) kwargs["created_after"] = created_after if function: kwargs["function"] = function.title kwargs["provider"] = function.provider response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs/", params=kwargs, headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) ) return [ Job(job.get("id"), job_service=self, raw_data=job, compute_profile=job.get("compute_profile")) for job in response_data.get("results", []) ] @_trace_job("provider_list") def provider_jobs(self, function: Optional[QiskitFunction], **kwargs) -> List[Job]: """Retrieve jobs for a specific provider and function. Args: function (QiskitFunction): Function object. limit (int, optional): Maximum number of jobs to return. Defaults to 10. offset (int, optional): Number of jobs to skip. Defaults to 0. status (str, optional): Filter by job status. created_after (str, optional): Filter jobs created after this timestamp. **kwargs: Additional query parameters. Returns: List[Job]: List of Job objects for the specified provider and function. Raises: QiskitServerlessException: If the function doesn't have an associated provider. """ if not function.provider: raise QiskitServerlessException("`function` doesn't have a provider.") limit = kwargs.get("limit", 10) kwargs["limit"] = limit offset = kwargs.get("offset", 0) kwargs["offset"] = offset status = kwargs.get("status", None) if status: status, _ = _map_status_to_serverless(status) kwargs["status"] = status created_after = kwargs.get("created_after", None) kwargs["created_after"] = created_after if function: kwargs["function"] = function.title kwargs["provider"] = function.provider response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs/provider/", params=kwargs, headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) ) return [ Job(job.get("id"), job_service=self, raw_data=job, compute_profile=job.get("compute_profile")) for job in response_data.get("results", []) ] @_trace_job("get") def job(self, job_id: str) -> Optional[Job]: url = f"{self.host}/api/{self.version}/jobs/{job_id}/" response_data = safe_json_request_as_dict( request=lambda: requests.get( url, headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), params={"with_result": "false"}, timeout=REQUESTS_TIMEOUT, ) ) job = None job_id = response_data.get("id") if job_id is not None: job = Job( job_id=job_id, job_service=self, compute_profile=response_data.get("compute_profile"), ) return job def run( self, program: Union[QiskitFunction, str], arguments: Optional[Dict[str, Any]] = None, config: Optional[Configuration] = None, provider: Optional[str] = None, *, compute_profile: Optional[str] = None, ) -> Job: if isinstance(program, QiskitFunction): title = program.title provider = program.provider else: title = str(program) tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.run") as span: span.set_attribute("function", title) span.set_attribute("provider", provider) span.set_attribute("arguments", str(arguments)) url = f"{self.host}/api/{self.version}/programs/run/" data = { "title": title, "provider": provider, "compute_profile": compute_profile, "arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder), } # type: Dict[str, Any] if config: data["config"] = asdict(config) else: data["config"] = asdict(Configuration()) response_data = safe_json_request_as_dict( request=lambda: requests.post( url=url, json=data, headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) ) job_id = response_data.get("id") span.set_attribute("job.id", job_id) return Job(job_id, job_service=self, compute_profile=response_data.get("compute_profile")) def get_job_data(self, job_id: str) -> Optional[dict]: return ( safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs/{job_id}/", params={"with_result": "false"}, headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) ) or None ) @_trace_job def status(self, job_id: str): default_status = "Unknown" response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs/{job_id}/", params={"with_result": "false"}, headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) ) status = response_data.get("status", default_status) sub_status = response_data.get("sub_status") if status == Job.RUNNING and sub_status is not None: return sub_status return status @_trace_job def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): if not service: try: service = QiskitRuntimeService(channel=self.channel, instance=self.instance, token=self.token) except InvalidAccountError: warnings.warn( "No QiskitRuntimeService can be associated to the given token and instance. " "Continuing without a QiskitRuntimeService." ) service = None data: dict[str, Any] = { "service": json.dumps(service, cls=QiskitObjectsEncoder), } response_data = safe_json_request_as_dict( request=lambda: requests.post( f"{self.host}/api/{self.version}/jobs/{job_id}/stop/", headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, json=data, ) ) return response_data.get("message") @_trace_job def result(self, job_id: str) -> Dict[str, Any]: gateway_url = f"{self.host}/api/{self.version}/jobs/{job_id}/result/" response = requests.get( gateway_url, headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) if response.status_code == 204: return {} # Not all redirects go to COS — HTTP→HTTPS redirects stay on the same host. # Checking the hostname detects only redirects to an external host (COS/MinIO). redirected_to_cos = urlparse(response.url).hostname != urlparse(gateway_url).hostname if redirected_to_cos: return json.loads(response.text, cls=QiskitObjectsDecoder) if response.ok else {} return json.loads(response.json().get("result", "{}") or "{}", cls=QiskitObjectsDecoder) @_trace_job def logs(self, job_id: str): gateway_url = f"{self.host}/api/{self.version}/jobs/{job_id}/logs/" response = requests.get( gateway_url, headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) if response.status_code == 204: return "No logs yet." # Not all redirects go to COS — HTTP→HTTPS redirects stay on the same host. # Checking the hostname detects only redirects to an external host (COS/MinIO). redirected_to_cos = urlparse(response.url).hostname != urlparse(gateway_url).hostname if redirected_to_cos: return response.text if response.ok else "Error fetching logs." return safe_json_request_as_dict(request=lambda: response).get("logs") @_trace_job def provider_logs(self, job_id: str): gateway_url = f"{self.host}/api/{self.version}/jobs/{job_id}/provider-logs/" response = requests.get( gateway_url, headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) if response.status_code == 204: return "No logs yet." # Not all redirects go to COS — HTTP→HTTPS redirects stay on the same host. # Checking the hostname detects only redirects to an external host (COS/MinIO). redirected_to_cos = urlparse(response.url).hostname != urlparse(gateway_url).hostname if redirected_to_cos: return response.text if response.ok else "Error fetching logs." return safe_json_request_as_dict(request=lambda: response).get("logs") @_trace_job def runtime_jobs(self, job_id: str, runtime_session: Optional[str] = None) -> list[str]: """Retrieve Qiskit IBM Runtime job ids that correspond to a given serverless job_id execution and, optionally, filtered by session id.""" response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs/{job_id}/runtime_jobs/", headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) ) if runtime_session: return [ job.get("runtime_job") for job in response_data.get("runtime_jobs", []) if job.get("runtime_session") == runtime_session ] return [job.get("runtime_job") for job in response_data.get("runtime_jobs", [])] @_trace_job def runtime_sessions(self, job_id: str): """Retrieve Qiskit IBM Runtime session ids that correspond to a given serverless job_id execution.""" response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs/{job_id}/runtime_jobs/", headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) ) runtime_jobs = response_data.get("runtime_jobs", []) out_sessions = sorted({job["runtime_session"] for job in runtime_jobs if job.get("runtime_session")}) return out_sessions def filtered_logs(self, job_id: str, **kwargs): all_logs = self.logs(job_id=job_id) included = "" include = kwargs.get("include") if include is not None: for line in all_logs.split("\n"): if re.search(include, line) is not None: included = included + line + "\n" else: included = all_logs excluded = "" exclude = kwargs.get("exclude") if exclude is not None: for line in included.split("\n"): if line != "" and re.search(exclude, line) is None: excluded = excluded + line + "\n" else: excluded = included return excluded def events(self, job_id: str, **kwargs) -> list[JobEvent]: """Returns events of the job. Args: job_id: The job id """ response_data = safe_json_request_as_list( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs/{job_id}/events/", params=kwargs, headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), timeout=REQUESTS_TIMEOUT, ) ) return [JobEvent.from_json(event) for event in response_data] ######################### ####### Functions ####### ######################### def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("function.upload") as span: span.set_attribute("function", program.title) url = f"{self.host}/api/{self.version}/programs/upload/" if program.image: # upload function with custom image function_uploaded = _upload_with_docker_image( program=program, url=url, token=self.token, span=span, client=self, instance=self.instance, channel=self.channel, ) elif program.entrypoint: # upload function with artifact function_uploaded = _upload_with_artifact( program=program, url=url, token=self.token, span=span, client=self, instance=self.instance, channel=self.channel, ) else: raise QiskitServerlessException("Function must either have `entrypoint` or `image` specified.") return function_uploaded @_trace_functions("list") def functions(self, **kwargs) -> List[RunnableQiskitFunction]: """Returns list of available functions.""" response_data = safe_json_request_as_list( request=lambda: requests.get( f"{self.host}/api/{self.version}/programs", headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), params=kwargs, timeout=REQUESTS_TIMEOUT, ) ) for program_data in response_data: program_data["client"] = self return [RunnableQiskitFunction.from_json(program_data) for program_data in response_data] @_trace_functions("get_by_title") def function(self, title: str, provider: Optional[str] = None) -> Optional[RunnableQiskitFunction]: """Returns program based on parameters.""" provider, title = format_provider_name_and_title(request_provider=provider, title=title) response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/programs/get_by_title/{title}", headers=get_headers(token=self.token, instance=self.instance, channel=self.channel), params={"provider": provider}, timeout=REQUESTS_TIMEOUT, ) ) response_data["client"] = self the_function = RunnableQiskitFunction.from_json(response_data) return the_function ##################### ####### FILES ####### ##################### def files(self, function: QiskitFunction) -> List[str]: """Returns the list of files available for the user in the Qiskit Function folder.""" return self._files_client.list(function) def provider_files(self, function: QiskitFunction) -> List[str]: """Returns the list of files available for the provider in the Qiskit Function folder.""" return self._files_client.provider_list(function) def file_download( self, file: str, function: QiskitFunction, target_name: Optional[str] = None, download_location: str = "./", ): """Download a file available to the user for the specific Qiskit Function.""" return self._files_client.download(file, download_location, function, target_name) def provider_file_download( self, file: str, function: QiskitFunction, target_name: Optional[str] = None, download_location: str = "./", ): """Download a file available to the provider for the specific Qiskit Function.""" return self._files_client.provider_download(file, download_location, function, target_name) def file_delete(self, file: str, function: QiskitFunction): """Deletes a file available to the user for the specific Qiskit Function.""" return self._files_client.delete(file, function) def provider_file_delete(self, file: str, function: QiskitFunction): """Deletes a file available to the provider for the specific Qiskit Function.""" return self._files_client.provider_delete(file, function) def file_upload(self, file: str, function: QiskitFunction): """Uploads a file in the specific user's Qiskit Function folder.""" return self._files_client.upload(file, function) def provider_file_upload(self, file: str, function: QiskitFunction): """Uploads a file in the specific provider's Qiskit Function folder.""" return self._files_client.provider_upload(file, function)
[docs]class IBMServerlessClient(ServerlessClient): """ A client for connecting to the IBM serverless host. Credentials can be saved to disk by calling the `save_account()` method:: from qiskit_serverless import IBMServerlessClient IBMServerlessClient.save_account(token=<INSERT_IBM_QUANTUM_TOKEN>, instance=<INSERT_CRN>) Once the credentials are saved, you can simply instantiate the client with no constructor args, as shown below. from qiskit_serverless import IBMServerlessClient client = IBMServerlessClient() Instead of saving credentials to disk, you can also set the environment variable ENV_GATEWAY_PROVIDER_TOKEN and then instantiate the client as below:: from qiskit_serverless import IBMServerlessClient client = IBMServerlessClient() You can also enable an account just for the current session by instantiating the provider with the API token:: from qiskit_serverless import IBMServerlessClient client = IBMServerlessClient(token=<INSERT_IBM_QUANTUM_TOKEN>, instance=<INSERT_CRN>) """
[docs] def __init__( self, token: Optional[str] = None, name: Optional[str] = None, instance: Optional[str] = None, channel: Optional[str] = None, *, host: Optional[str] = None, ): """ Initialize a client with access to an IBMQ-provided remote cluster. If a ``token`` is used to initialize an instance, the ``name`` argument will be ignored. If only a ``name`` is provided, the token for the named account will be retrieved from the user's local IBM Quantum account config file. If neither argument is provided, the token will be searched for in the environment variables and also in the local IBM Quantum account config file using the default account name. Args: host: host of gateway. Optional. It uses IBM_SERVERLESS_HOST_URL env var or IBM host token: IBM quantum token name: Name of the account to load instance: IBM Cloud CRN channel: identifies the method to use to authenticate the user """ channel = channel or Channel.IBM_QUANTUM_PLATFORM.value # For backwards compatibility # Initialize QiskitRuntimeService self._service = QiskitRuntimeService(channel=channel, token=token, name=name, instance=instance) self.account = self._service._account # Per-instance cache keyed by backend name; populated lazily by backends() or _get_backend(). # Instance-level (not class-level) to avoid cross-client leakage. self._backends_cache: Dict[str, Any] = {} super().__init__( channel=self.account.channel, token=self.account.token, instance=self.account.instance, host=host if host else IBM_SERVERLESS_HOST_URL, )
@staticmethod def save_account( token: Optional[str] = None, name: Optional[str] = None, overwrite: Optional[bool] = False, instance: Optional[str] = None, channel: Optional[str] = None, ) -> None: """ Save the account to disk for future use. Args: token: IBM Quantum API token name: Name of the account to save overwrite: ``True`` if the existing account is to be overwritten instance: IBM Cloud CRN channel: identifies the method to use to authenticate the user """ try: QiskitRuntimeService.save_account( token=token, name=name, overwrite=overwrite, instance=instance, channel=channel, ) except InvalidAccountError as ex: raise QiskitServerlessException(f"Invalid format in account inputs - {ex}") from ex def usage(self) -> dict[str, Any]: """Return runtime usage information for the active instance. Exposes the underlying :meth:`QiskitRuntimeService.usage` method to retrieve the instance's runtime quota information, including ``usage_remaining_seconds`` and ``usage_limit_reached``. Returns: A dictionary containing usage information as reported by the runtime service. Raises: QiskitServerlessException: If the usage information cannot be retrieved. """ try: return self._service.usage() except Exception as exc: # pylint: disable=broad-except raise QiskitServerlessException( f"Failed to retrieve usage information for instance '{self.instance}': {exc}" ) from exc def backends( # pylint: disable=too-many-positional-arguments self, refresh_cache: bool = False, name: str | None = None, min_num_qubits: int | None = None, filters: Callable[[IBMBackend], bool] | None = None, **kwargs: Any, ) -> list[IBMBackend]: """Return backends accessible through this instance. Exposes the underlying :meth:`QiskitRuntimeService.backends` method with per-instance caching. Results are cached after the first call; use ``refresh_cache=True`` to force a refresh. Args: refresh_cache: If ``True``, refresh the cache by fetching backends from the service. name: Backend name to filter by. min_num_qubits: Minimum number of qubits the backend must have. filters: More complex filters, such as lambda functions. For example:: client.backends(filters=lambda b: b.max_shots > 50000) client.backends(filters=lambda x: "rz" in x.basis_gates) **kwargs: Simple filters that require a specific value for an attribute in backend configuration or status. Examples:: # Get backends with at least 127 qubits client.backends(min_num_qubits=127) For the full list of backend attributes, see the `IBMBackend class documentation <https://quantum.cloud.ibm.com/docs/api/qiskit-ibm-runtime/qiskit-runtime-service#backends>`_ Returns: List of available backends that match the filter criteria. Raises: QiskitServerlessException: If the backend listing call fails. """ if not refresh_cache and self._backends_cache: # cache is available and refresh flag is false return list(self._backends_cache.values()) try: backend_list = self._service.backends( name=name, min_num_qubits=min_num_qubits, filters=filters, **kwargs, ) except Exception as exc: raise QiskitServerlessException( f"Failed to retrieve backends for instance '{self.instance}': {exc}" ) from exc # deleting cached backends as they could have unavailable backends and update with new accessible backends self._backends_cache = {} for backend in backend_list: self._backends_cache[backend.name] = backend return backend_list def backend(self, name: str, **kwargs: Any) -> Backend: """Fetch a single backend by name and update the cache. Exposes the underlying :meth:`QiskitRuntimeService.backend` method to perform a targeted backend lookup. This is more efficient than listing all backends and validates that the caller has access to the requested backend. The cache is updated on every call to reflect current access permissions. Args: name: Name of the backend (e.g., ``"ibm_torino"``). **kwargs: Simple filters that require a specific value for an attribute in backend configuration or status. For the full list of backend attributes, see the `IBMBackend class documentation <https://quantum.cloud.ibm.com/docs/api/qiskit-ibm-runtime/qiskit-runtime-service#backend>`_ Returns: Backend matching the specified name. Raises: QiskitServerlessException: If the backend is not found or is inaccessible. """ try: backend = self._service.backend(name=name, **kwargs) except QiskitBackendNotFoundError as exc: raise QiskitServerlessException( f"Backend '{name}' is not available or you do not have access to it " f"with instance '{self.instance}'. " f"Call client.backends() to list accessible backends." ) from exc except Exception as exc: raise QiskitServerlessException(f"Failed to retrieve backend '{name}': {exc}") from exc self._backends_cache[name] = backend return backend def least_busy( self, min_num_qubits: int | None = None, filters: Callable[[IBMBackend], bool] | None = None, **kwargs: Any, ) -> IBMBackend: """Return the least busy backend matching the specified criteria. Exposes the underlying :meth:`QiskitRuntimeService.least_busy` method to find the backend with the fewest pending jobs. The result is cached in the instance. Args: min_num_qubits: Minimum number of qubits the backend must have. filters: Filters can be defined as for the :meth:`backends` method. Example:: client.least_busy(min_num_qubits=5, operational=True) **kwargs: Additional filters for backend configuration or status attributes. Returns: The backend with the fewest number of pending jobs that matches the criteria. Raises: QiskitServerlessException: If no backend matches the criteria or the call fails. """ try: backend = self._service.least_busy( min_num_qubits=min_num_qubits, filters=filters, **kwargs, ) except QiskitBackendNotFoundError as exc: raise QiskitServerlessException( f"No available backend matches the criteria with instance '{self.instance}'. " f"Call client.backends() to list accessible backends." ) from exc except Exception as exc: raise QiskitServerlessException(f"Failed to retrieve backend: {exc}") from exc self._backends_cache[backend.name] = backend return backend def _check_usage(self, suppress_low_usage_warning: bool = False) -> None: """Check instance runtime quota and warn or raise if insufficient. Retrieves ``usage_remaining_seconds`` from :meth:`usage`. If the key is absent, the plan is unlimited and no action is taken. Transient errors from the usage endpoint are downgraded to a warning to avoid blocking job submission. Thresholds are configurable via environment variables (see constants.py): - ``USAGE_ZERO_EPSILON_SECONDS`` (default 1 s): Raises an exception when remaining time is at or below this threshold. - ``USAGE_LOW_THRESHOLD_SECONDS`` (default 600 s): Emits a warning when remaining time is below this threshold (unless suppressed). Args: suppress_low_usage_warning: If ``True``, suppress the warning when remaining quota is below ``USAGE_LOW_THRESHOLD_SECONDS``. The exception for exhausted quota (at or below ``USAGE_ZERO_EPSILON_SECONDS``) is still raised. Raises: QiskitServerlessException: When remaining quota is at or below ``USAGE_ZERO_EPSILON_SECONDS``, or when ``usage_limit_reached`` is ``True`` with no remaining time recorded. """ try: usage = self.usage() except Exception as exc: # pylint: disable=broad-except warnings.warn( f"Could not retrieve usage information for instance '{self.instance}': {exc}. " "Proceeding with job submission; verify your instance quota manually." ) return remaining = usage.get("usage_remaining_seconds") limit_reached = usage.get("usage_limit_reached", False) if remaining is None and not limit_reached: return # usage_limit_reached without a recorded remaining value → treat as zero. if limit_reached and remaining is None: remaining = 0.0 if remaining is not None and remaining <= USAGE_ZERO_EPSILON_SECONDS: raise QiskitServerlessException( f"Instance '{self.instance}' has no remaining runtime quota " f"(remaining: {remaining:.2f}s). " "The job would wait indefinitely for runtime resources. " "Check your instance quota at https://quantum.cloud.ibm.com/instances." ) if not suppress_low_usage_warning and remaining is not None and remaining <= USAGE_LOW_THRESHOLD_SECONDS: warnings.warn( f"Instance '{self.instance}' has low remaining runtime quota " f"({remaining:.0f}s remaining, threshold: {USAGE_LOW_THRESHOLD_SECONDS:.0f}s). " "Consider checking your quota before submitting long-running jobs. " "Check https://quantum.cloud.ibm.com/instances for details." ) def run( # pylint: disable=too-many-positional-arguments self, program: Union[QiskitFunction, str], arguments: Optional[Dict[str, Any]] = None, config: Optional[Configuration] = None, provider: Optional[str] = None, *, compute_profile: Optional[str] = None, suppress_low_usage_warning: bool = False, ) -> "Job": """Run a Qiskit Function with pre-flight validation before submitting to the gateway. Checks are performed before delegating to ``ServerlessClient.run``. Checks include instance has no remaining runtime; warns when low and check accessibility to the backed Args: program: ``QiskitFunction`` object or function title string. arguments: Runtime arguments. The optional ``backend_name`` key is inspected for the backend access check but forwarded unchanged. config: Optional execution configuration. provider: Optional provider name override. compute_profile: Optional compute-profile name. suppress_low_usage_warning: If ``True``, suppress the warning when remaining runtime quota is below ``USAGE_LOW_THRESHOLD_SECONDS``. The exception for exhausted quota (at or below ``USAGE_ZERO_EPSILON_SECONDS``) is still raised. Returns: :class:`~qiskit_serverless.core.job.Job` handle for the submitted run. Raises: QiskitServerlessException: If quota is exhausted, the backend is inaccessible, or the instance has no available backends. """ backend_name = (arguments or {}).get("backend_name") self._check_usage(suppress_low_usage_warning) if backend_name: # Single-backend lookup — fast and confirms access without a full instance listing. self.backend(backend_name) else: # No specific backend requested; verify the instance exposes at least one. if not self.backends(): raise QiskitServerlessException( f"Instance '{self.instance}' has no backends available. " "Cannot run a Qiskit Function without an accessible backend. " "Check your instance configuration at https://quantum.cloud.ibm.com/instances." ) return super().run( program=program, arguments=arguments, config=config, provider=provider, compute_profile=compute_profile, )
def _upload_with_docker_image( # pylint: disable=too-many-positional-arguments program: QiskitFunction, url: str, token: str, span: Any, client: RunService, instance: Optional[str], channel: Optional[str], ) -> RunnableQiskitFunction: """Uploads function with custom docker image. Args: program (QiskitFunction): function instance url (str): upload gateway url token (str): auth token span (Any): tracing span instance (Optional[str]): IBM Cloud crn Returns: str: uploaded function name """ response_data = safe_json_request_as_dict( request=lambda: requests.post( url=url, data={ "title": program.title, "provider": program.provider, "image": program.image, "runner": program.runner, "arguments": json.dumps({}), "dependencies": json.dumps(program.dependencies or []), "env_vars": json.dumps(program.env_vars or {}), "description": program.description, "version": program.version, }, headers=get_headers(token=token, instance=instance, channel=channel), timeout=REQUESTS_TIMEOUT, ) ) program_title = response_data.get("title", "na") program_provider = response_data.get("provider", "na") span.set_attribute("function.title", program_title) span.set_attribute("function.provider", program_provider) response_data["client"] = client return RunnableQiskitFunction.from_json(response_data) def _upload_with_artifact( # pylint: disable=too-many-positional-arguments, too-many-locals program: QiskitFunction, url: str, token: str, span: Any, client: RunService, instance: Optional[str], channel: Optional[str], ) -> RunnableQiskitFunction: """Uploads function with artifact. Args: program (QiskitFunction): function instance url (str): endpoint for gateway upload token (str): auth token span (Any): tracing span instance (Optional[str]): IBM Cloud crn Raises: QiskitServerlessException: if no entrypoint or size of artifact is too large. Returns: str: uploaded function name """ artifact_file_path = os.path.join(program.working_dir, "artifact.tar") # check if entrypoint exists if not os.path.exists(os.path.join(program.working_dir, program.entrypoint)) or program.entrypoint[0] == "/": raise QiskitServerlessException( f"Entrypoint file [{program.entrypoint}] does not exist " f"in [{program.working_dir}] working directory." ) try: with tarfile.open(artifact_file_path, "w", dereference=True) as tar: for filename in os.listdir(program.working_dir): fpath = os.path.join(program.working_dir, filename) tar.add(fpath, arcname=filename) # check file size size_in_mb = Path(artifact_file_path).stat().st_size / 1024**2 if size_in_mb > MAX_ARTIFACT_FILE_SIZE_MB: raise QiskitServerlessException( f"{artifact_file_path} is {int(size_in_mb)} Mb, " f"which is greater than {MAX_ARTIFACT_FILE_SIZE_MB} allowed. " f"Try to reduce size of `working_dir`." ) with open(artifact_file_path, "rb") as file: response_data = safe_json_request_as_dict( request=lambda: requests.post( url=url, data={ "title": program.title, "provider": program.provider, "entrypoint": program.entrypoint, "runner": program.runner, "arguments": json.dumps({}), "dependencies": json.dumps(program.dependencies or []), "env_vars": json.dumps(program.env_vars or {}), "description": program.description, "version": program.version, }, files={"artifact": file}, headers=get_headers(token=token, instance=instance, channel=channel), timeout=REQUESTS_TIMEOUT, ) ) span.set_attribute("function.title", response_data.get("title", "na")) span.set_attribute("function.provider", response_data.get("provider", "na")) response_data["client"] = client response_function = RunnableQiskitFunction.from_json(response_data) except QiskitServerlessException as error: raise error except Exception as error: # pylint: disable=broad-exception-caught raise QiskitServerlessException from error finally: if os.path.exists(artifact_file_path): os.remove(artifact_file_path) return response_function