Source code for qiskit_serverless.core.clients.ray_client

# This code is a Qiskit project.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
================================================
Provider (:mod:`qiskit_serverless.core.client`)
================================================

.. currentmodule:: qiskit_serverless.core.client

Qiskit Serverless provider
===========================

.. autosummary::
    :toctree: ../stubs/

    RayClient
"""
# pylint: disable=duplicate-code
import json
from typing import Optional, List, Dict, Any, Union
from uuid import uuid4

from ray.dashboard.modules.job.sdk import JobSubmissionClient
from qiskit_ibm_runtime import QiskitRuntimeService

from qiskit_serverless.core.constants import (
    OT_PROGRAM_NAME,
    ENV_JOB_ARGUMENTS,
)
from qiskit_serverless.core.job import (
    Configuration,
    Job,
)
from qiskit_serverless.core.function import QiskitFunction, RunnableQiskitFunction
from qiskit_serverless.core.local_functions_store import LocalFunctionsStore
from qiskit_serverless.exception import QiskitServerlessException
from qiskit_serverless.serializers.program_serializers import (
    QiskitObjectsEncoder,
)

from qiskit_serverless.core.client import BaseClient


[docs]class RayClient(BaseClient): """RayClient."""
[docs] def __init__(self, host: str): """Ray client Args: host: ray head node host Example: >>> ray_provider = RayClient("http://localhost:8265") """ super().__init__("ray-client", host) self.job_submission_client = JobSubmissionClient(host) self._functions = LocalFunctionsStore(self)
@classmethod def from_dict(cls, dictionary: dict): return RayClient(**dictionary) #################### ####### JOBS ####### #################### def jobs(self, **kwargs) -> List[Job]: """Return list of jobs. Returns: list of jobs. """ return [ Job(job.job_id, job_service=self) for job in self.job_submission_client.list_jobs() ] def job(self, job_id: str) -> Optional[Job]: """Returns job by job id. Args: job_id: job id Returns: Job instance """ return Job( self.job_submission_client.get_job_info(job_id).submission_id, job_service=self, ) def run( self, program: Union[QiskitFunction, str], arguments: Optional[Dict[str, Any]] = None, config: Optional[Configuration] = None, ) -> Job: # pylint: disable=too-many-locals title = program.title if isinstance(program, QiskitFunction) else str(program) saved_program = self.function(title) if not saved_program: raise QiskitServerlessException( "QiskitFunction provided is not uploaded to the client. Use upload() first." ) arguments = arguments or {} entrypoint = f"python {saved_program.entrypoint}" # set program name so OT can use it as parent span name env_vars = { **(saved_program.env_vars or {}), **{OT_PROGRAM_NAME: saved_program.title}, **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, } job_id = self.job_submission_client.submit_job( entrypoint=entrypoint, submission_id=f"qs_{uuid4()}", runtime_env={ "working_dir": saved_program.working_dir, "pip": saved_program.dependencies, "env_vars": env_vars, }, ) return Job(job_id=job_id, job_service=self) def status(self, job_id: str) -> str: """Check status.""" return self.job_submission_client.get_job_status(job_id).value def stop( self, job_id: str, service: Optional[QiskitRuntimeService] = None ) -> Union[str, bool]: """Stops job/program.""" return self.job_submission_client.stop_job(job_id) def result(self, job_id: str) -> Any: """Return results.""" return self.logs(job_id) def logs(self, job_id: str) -> str: """Return logs.""" return self.job_submission_client.get_job_logs(job_id) def filtered_logs(self, job_id: str, **kwargs) -> str: """Return filtered logs.""" raise NotImplementedError ######################### ####### Functions ####### ######################### def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: """Uploads program.""" return self._functions.upload(program) def functions(self, **kwargs) -> List[RunnableQiskitFunction]: """Returns list of available programs.""" return self._functions.functions() def function( self, title: str, provider: Optional[str] = None ) -> Optional[RunnableQiskitFunction]: """Returns program based on parameters.""" return self._functions.function(title)