Source code for qiskit_serverless.core.clients.serverless_client

# This code is a Qiskit project.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
================================================
Provider (:mod:`qiskit_serverless.core.client`)
================================================

.. currentmodule:: qiskit_serverless.core.client

Qiskit Serverless provider
===========================

.. autosummary::
    :toctree: ../stubs/

    ServerlessClient
"""
# pylint: disable=duplicate-code
import json
import os.path
import os
import re
import tarfile
from pathlib import Path
from dataclasses import asdict
from typing import Optional, List, Dict, Any, Union

import requests
from opentelemetry import trace
from qiskit_ibm_runtime import QiskitRuntimeService

from qiskit_serverless.core.constants import (
    REQUESTS_TIMEOUT,
    ENV_GATEWAY_PROVIDER_HOST,
    ENV_GATEWAY_PROVIDER_VERSION,
    ENV_GATEWAY_PROVIDER_TOKEN,
    GATEWAY_PROVIDER_VERSION_DEFAULT,
    IBM_SERVERLESS_HOST_URL,
    MAX_ARTIFACT_FILE_SIZE_MB,
)
from qiskit_serverless.core.client import BaseClient
from qiskit_serverless.core.files import GatewayFilesClient
from qiskit_serverless.core.job import (
    Job,
    Configuration,
)
from qiskit_serverless.core.function import (
    QiskitFunction,
    RunService,
    RunnableQiskitFunction,
)

from qiskit_serverless.exception import QiskitServerlessException
from qiskit_serverless.utils.json import (
    safe_json_request_as_dict,
    safe_json_request_as_list,
    safe_json_request,
)
from qiskit_serverless.utils.formatting import format_provider_name_and_title
from qiskit_serverless.serializers.program_serializers import (
    QiskitObjectsEncoder,
    QiskitObjectsDecoder,
)


[docs]class ServerlessClient(BaseClient): """ A client for connecting to a specified host. Example: >>> client = ServerlessClient( >>> name="<NAME>", >>> host="<HOST>", >>> token="<TOKEN>", >>> ) """
[docs] def __init__( # pylint: disable=too-many-positional-arguments self, name: Optional[str] = None, host: Optional[str] = None, version: Optional[str] = None, token: Optional[str] = None, verbose: bool = False, ): """ Initializes the ServerlessClient instance. Args: name: name of client host: host of gateway version: version of gateway token: authorization token """ name = name or "gateway-client" host = host or os.environ.get(ENV_GATEWAY_PROVIDER_HOST) if host is None: raise QiskitServerlessException("Please provide `host` of gateway.") version = version or os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) if version is None: version = GATEWAY_PROVIDER_VERSION_DEFAULT token = token or os.environ.get(ENV_GATEWAY_PROVIDER_TOKEN) if token is None: raise QiskitServerlessException( "Authentication credentials must be provided in form of `token`." ) super().__init__(name, host, token) self.verbose = verbose self.version = version self._verify_token(token) self._files_client = GatewayFilesClient(self.host, self.token, self.version)
@classmethod def from_dict(cls, dictionary: dict): return ServerlessClient(**dictionary) def _verify_token(self, token: str): """Verify token.""" try: safe_json_request( request=lambda: requests.get( url=f"{self.host}/api/v1/programs/", headers={"Authorization": f"Bearer {token}"}, timeout=REQUESTS_TIMEOUT, ), verbose=self.verbose, ) except QiskitServerlessException as reason: raise QiskitServerlessException("Cannot verify token.") from reason #################### ####### JOBS ####### #################### def jobs(self, **kwargs) -> List[Job]: tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.list"): limit = kwargs.get("limit", 10) kwargs["limit"] = limit offset = kwargs.get("offset", 0) kwargs["offset"] = offset response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs", params=kwargs, headers={"Authorization": f"Bearer {self.token}"}, timeout=REQUESTS_TIMEOUT, ) ) return [ Job(job.get("id"), job_service=self, raw_data=job) for job in response_data.get("results", []) ] def job(self, job_id: str) -> Optional[Job]: tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.get"): url = f"{self.host}/api/{self.version}/jobs/{job_id}/" response_data = safe_json_request_as_dict( request=lambda: requests.get( url, headers={"Authorization": f"Bearer {self.token}"}, timeout=REQUESTS_TIMEOUT, ) ) job = None job_id = response_data.get("id") if job_id is not None: job = Job( job_id=job_id, job_service=self, ) return job def run( self, program: Union[QiskitFunction, str], arguments: Optional[Dict[str, Any]] = None, config: Optional[Configuration] = None, provider: Optional[str] = None, ) -> Job: if isinstance(program, QiskitFunction): title = program.title provider = program.provider else: title = str(program) tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.run") as span: span.set_attribute("program", title) span.set_attribute("provider", provider) span.set_attribute("arguments", str(arguments)) url = f"{self.host}/api/{self.version}/programs/run/" data = { "title": title, "provider": provider, "arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder), } # type: Dict[str, Any] if config: data["config"] = asdict(config) else: data["config"] = asdict(Configuration()) response_data = safe_json_request_as_dict( request=lambda: requests.post( url=url, json=data, headers={"Authorization": f"Bearer {self.token}"}, timeout=REQUESTS_TIMEOUT, ) ) job_id = response_data.get("id") span.set_attribute("job.id", job_id) return Job(job_id, job_service=self) def status(self, job_id: str): tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.status"): default_status = "Unknown" response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs/{job_id}/", headers={"Authorization": f"Bearer {self.token}"}, timeout=REQUESTS_TIMEOUT, ) ) return response_data.get("status", default_status) def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.stop"): if service: data = { "service": json.dumps(service, cls=QiskitObjectsEncoder), } else: data = { "service": None, } response_data = safe_json_request_as_dict( request=lambda: requests.post( f"{self.host}/api/{self.version}/jobs/{job_id}/stop/", headers={"Authorization": f"Bearer {self.token}"}, timeout=REQUESTS_TIMEOUT, json=data, ) ) return response_data.get("message") def result(self, job_id: str): tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.result"): response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs/{job_id}/", headers={"Authorization": f"Bearer {self.token}"}, timeout=REQUESTS_TIMEOUT, ) ) return json.loads( response_data.get("result", "{}") or "{}", cls=QiskitObjectsDecoder ) def logs(self, job_id: str): tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.logs"): response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/jobs/{job_id}/logs/", headers={"Authorization": f"Bearer {self.token}"}, timeout=REQUESTS_TIMEOUT, ) ) return response_data.get("logs") def filtered_logs(self, job_id: str, **kwargs): all_logs = self.logs(job_id=job_id) included = "" include = kwargs.get("include") if include is not None: for line in all_logs.split("\n"): if re.search(include, line) is not None: included = included + line + "\n" else: included = all_logs excluded = "" exclude = kwargs.get("exclude") if exclude is not None: for line in included.split("\n"): if line != "" and re.search(exclude, line) is None: excluded = excluded + line + "\n" else: excluded = included return excluded ######################### ####### Functions ####### ######################### def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.run") as span: span.set_attribute("program", program.title) url = f"{self.host}/api/{self.version}/programs/upload/" if program.image is not None: # upload function with custom image function_uploaded = _upload_with_docker_image( program=program, url=url, token=self.token, span=span, client=self ) elif program.entrypoint is not None: # upload funciton with artifact function_uploaded = _upload_with_artifact( program=program, url=url, token=self.token, span=span, client=self ) else: raise QiskitServerlessException( "Function must either have `entrypoint` or `image` specified." ) return function_uploaded def functions(self, **kwargs) -> List[RunnableQiskitFunction]: """Returns list of available programs.""" tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("program.list"): response_data = safe_json_request_as_list( request=lambda: requests.get( f"{self.host}/api/{self.version}/programs", headers={"Authorization": f"Bearer {self.token}"}, params=kwargs, timeout=REQUESTS_TIMEOUT, ) ) return [ RunnableQiskitFunction( client=self, title=program.get("title"), provider=program.get("provider", None), raw_data=program, description=program.get("description"), ) for program in response_data ] def function( self, title: str, provider: Optional[str] = None ) -> Optional[RunnableQiskitFunction]: """Returns program based on parameters.""" provider, title = format_provider_name_and_title( request_provider=provider, title=title ) tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("program.get_by_title"): response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/programs/get_by_title/{title}", headers={"Authorization": f"Bearer {self.token}"}, params={"provider": provider}, timeout=REQUESTS_TIMEOUT, ) ) return RunnableQiskitFunction( client=self, title=response_data.get("title"), provider=response_data.get("provider", None), raw_data=response_data, ) ##################### ####### FILES ####### ##################### def files(self, provider: Optional[str] = None) -> List[str]: """Returns list of available files produced by programs to download.""" return self._files_client.list(provider) def file_download( self, file: str, target_name: Optional[str] = None, download_location: str = "./", provider: Optional[str] = None, ): """Download file.""" return self._files_client.download( file, download_location, target_name, provider ) def file_delete(self, file: str, provider: Optional[str] = None): """Deletes file uploaded or produced by the programs,""" return self._files_client.delete(file, provider) def file_upload(self, file: str, provider: Optional[str] = None): """Upload file.""" return self._files_client.upload(file, provider)
[docs]class IBMServerlessClient(ServerlessClient): """ A client for connecting to the IBM serverless host. Credentials can be saved to disk by calling the `save_account()` method:: from qiskit_serverless import IBMServerlessClient IBMServerlessClient.save_account(token=<INSERT_IBM_QUANTUM_TOKEN>) Once the credentials are saved, you can simply instantiate the client with no constructor args, as shown below. from qiskit_serverless import IBMServerlessClient client = IBMServerlessClient() Instead of saving credentials to disk, you can also set the environment variable ENV_GATEWAY_PROVIDER_TOKEN and then instantiate the client as below:: from qiskit_serverless import IBMServerlessClient client = IBMServerlessClient() You can also enable an account just for the current session by instantiating the provider with the API token:: from qiskit_serverless import IBMServerlessClient client = IBMServerlessClient(token=<INSERT_IBM_QUANTUM_TOKEN>) """
[docs] def __init__(self, token: Optional[str] = None, name: Optional[str] = None): """ Initialize a client with access to an IBMQ-provided remote cluster. If a ``token`` is used to initialize an instance, the ``name`` argument will be ignored. If only a ``name`` is provided, the token for the named account will be retrieved from the user's local IBM Quantum account config file. If neither argument is provided, the token will be searched for in the environment variables and also in the local IBM Quantum account config file using the default account name. Args: token: IBM quantum token name: Name of the account to load """ token = token or QiskitRuntimeService(name=name).active_account().get("token") super().__init__(token=token, host=IBM_SERVERLESS_HOST_URL)
@staticmethod def save_account( token: Optional[str] = None, name: Optional[str] = None, overwrite: Optional[bool] = False, ) -> None: """ Save the account to disk for future use. Args: token: IBM Quantum API token name: Name of the account to save overwrite: ``True`` if the existing account is to be overwritten """ QiskitRuntimeService.save_account(token=token, name=name, overwrite=overwrite)
def _upload_with_docker_image( program: QiskitFunction, url: str, token: str, span: Any, client: RunService ) -> RunnableQiskitFunction: """Uploads function with custom docker image. Args: program (QiskitFunction): function instance url (str): upload gateway url token (str): auth token span (Any): tracing span Returns: str: uploaded function name """ response_data = safe_json_request_as_dict( request=lambda: requests.post( url=url, data={ "title": program.title, "provider": program.provider, "image": program.image, "arguments": json.dumps({}), "dependencies": json.dumps(program.dependencies or []), "env_vars": json.dumps(program.env_vars or {}), "description": program.description, }, headers={"Authorization": f"Bearer {token}"}, timeout=REQUESTS_TIMEOUT, ) ) program_title = response_data.get("title", "na") program_provider = response_data.get("provider", "na") span.set_attribute("program.title", program_title) span.set_attribute("program.provider", program_provider) response_data["client"] = client return RunnableQiskitFunction.from_json(response_data) def _upload_with_artifact( program: QiskitFunction, url: str, token: str, span: Any, client: RunService ) -> RunnableQiskitFunction: """Uploads function with artifact. Args: program (QiskitFunction): function instance url (str): endpoint for gateway upload token (str): auth token span (Any): tracing span Raises: QiskitServerlessException: if no entrypoint or size of artifact is too large. Returns: str: uploaded function name """ artifact_file_path = os.path.join(program.working_dir, "artifact.tar") # check if entrypoint exists if ( not os.path.exists(os.path.join(program.working_dir, program.entrypoint)) or program.entrypoint[0] == "/" ): raise QiskitServerlessException( f"Entrypoint file [{program.entrypoint}] does not exist " f"in [{program.working_dir}] working directory." ) try: with tarfile.open(artifact_file_path, "w", dereference=True) as tar: for filename in os.listdir(program.working_dir): fpath = os.path.join(program.working_dir, filename) tar.add(fpath, arcname=filename) # check file size size_in_mb = Path(artifact_file_path).stat().st_size / 1024**2 if size_in_mb > MAX_ARTIFACT_FILE_SIZE_MB: raise QiskitServerlessException( f"{artifact_file_path} is {int(size_in_mb)} Mb, " f"which is greater than {MAX_ARTIFACT_FILE_SIZE_MB} allowed. " f"Try to reduce size of `working_dir`." ) with open(artifact_file_path, "rb") as file: response_data = safe_json_request_as_dict( request=lambda: requests.post( url=url, data={ "title": program.title, "provider": program.provider, "entrypoint": program.entrypoint, "arguments": json.dumps({}), "dependencies": json.dumps(program.dependencies or []), "env_vars": json.dumps(program.env_vars or {}), "description": program.description, }, files={"artifact": file}, headers={"Authorization": f"Bearer {token}"}, timeout=REQUESTS_TIMEOUT, ) ) span.set_attribute("program.title", response_data.get("title", "na")) span.set_attribute("program.provider", response_data.get("provider", "na")) response_data["client"] = client response_function = RunnableQiskitFunction.from_json(response_data) except Exception as error: # pylint: disable=broad-exception-caught raise QiskitServerlessException from error finally: if os.path.exists(artifact_file_path): os.remove(artifact_file_path) return response_function