{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallel workflows with QiskitFunction\n", "\n", "In this tutorial, we’ll explore how to run quantum workloads in parallel using Qiskit Serverless. The function defines a distributed task that transpiles and samples a single circuit on a chosen backend; then the main body launches one task per circuit and gathers the results.\n", "\n", "Instead of executing a single circuit like in [Tutorial 02](./02_arguments_and_results.ipynb), we’ll pass a list of circuits and use the [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) decorator to transpile each one concurrently across distributed compute resources. Then, we will send them in one batch for execution.\n", "\n", "## Prep work\n", "To utilize prallelism, the function we will write needs to have a method that can run independently from the main thread. Transpilation of a circuit can run independently from other tanspilations so we will make it our parallel task.\n", "\n", "The function accepts:\n", " - A list of circuits to run.\n", " - A backend name (either a simulator or a real device).\n", " - An optional service object for accessing IBM Quantum backends.\n", "\n", "The function output:\n", " - Each circuit result's counts.\n", "\n", "\n", "### Write the function\n", "\n", "Let's take a look at the function file [./source_files/function_with_parallel_workflow.py](./source_files/function_with_parallel_workflow.py). \n", "\n", "```python\n", "\"\"\"function with parallel workflow for jupyter notebook.\"\"\"\n", "import os\n", "from qiskit import QuantumCircuit\n", "from qiskit.providers import BackendV2\n", "from qiskit.providers.exceptions import QiskitBackendNotFoundError\n", "from qiskit.transpiler import generate_preset_pass_manager\n", "from qiskit_ibm_runtime import QiskitRuntimeService\n", "from qiskit_ibm_runtime.fake_provider import FakeProviderForBackendV2\n", "from qiskit_ibm_runtime import SamplerV2 as Sampler\n", "from qiskit_serverless import get_arguments, save_result, distribute_task, get\n", "\n", "\n", "@distribute_task()\n", "def distributed_transpilation(\n", " circuit_idx: int, circuit: QuantumCircuit, target_backend: BackendV2\n", "):\n", " \"\"\"Distributed task that returns an ISA circuit ready for execution.\"\"\"\n", " print(\n", " f\"[distributed_transpilation] Start (index={circuit_idx}, \"\n", " f\"qubits={circuit.num_qubits}, clbits={circuit.num_clbits}, backend={target_backend.name})\"\n", " )\n", " pm = generate_preset_pass_manager(backend=target_backend, optimization_level=3)\n", " isa_circuit = pm.run(circuit)\n", " print(\"[distributed_transpilation] Transpilation complete (optimization_level=3)\")\n", " return isa_circuit\n", "\n", "\n", "# ----- parse inputs -----\n", "# get all arguments passed to this function\n", "print(\"[main] Parsing arguments...\")\n", "arguments = get_arguments()\n", "\n", "# Extract inputs we care about\n", "circuits = arguments.get(\"circuits\")\n", "backend_name = arguments.get(\"backend_name\")\n", "service = arguments.get(\"service\")\n", "\n", "# Normalize inputs\n", "if circuits is None:\n", " raise ValueError(\n", " \"`circuits` is required and must be a QuantumCircuit or a list of them.\"\n", " )\n", "if not isinstance(circuits, list):\n", " circuits = [circuits]\n", "\n", "# Basic validation\n", "if not all(isinstance(circuit, QuantumCircuit) for circuit in circuits):\n", " raise ValueError(\"`circuits` must be a list of qiskit.QuantumCircuit objects.\")\n", "if not isinstance(backend_name, str) or len(backend_name) == 0:\n", " raise ValueError(\"backend_name must be a non-empty string.\")\n", "\n", "print(\n", " f\"[main] Inputs received (num_circuits={len(circuits)}, backend_name={backend_name})\"\n", ")\n", "\n", "# ----- resolve provider / backend -----\n", "# Choose a provider: fake provider for local testing, or a real servic\n", "if \"fake\" in backend_name.lower():\n", " print(\n", " \"[main] Using fake provider (auto-selected because backend_name contains 'fake').\"\n", " )\n", " service = FakeProviderForBackendV2()\n", "\n", "if isinstance(service, (FakeProviderForBackendV2, QiskitRuntimeService)):\n", " try:\n", " backend = service.backend(backend_name)\n", " print(f\"[main] Backend resolved (name={backend.name})\")\n", " except QiskitBackendNotFoundError as e:\n", " raise ValueError(f\"Error retrieving backend {backend_name}: {e}\") from e\n", "else:\n", " # Fallback: build a Runtime service from environment variables\n", " print(\n", " \"[main] No service provided and backend not fake; \"\n", " \"attempting to initialize QiskitRuntimeService from environment variables...\"\n", " )\n", " try:\n", " service = QiskitRuntimeService(\n", " channel=os.environ.get(\"QISKIT_IBM_CHANNEL\"),\n", " token=os.environ.get(\"QISKIT_IBM_TOKEN\"),\n", " instance=os.environ.get(\"QISKIT_IBM_INSTANCE\"),\n", " url=os.environ.get(\"QISKIT_IBM_URL\"),\n", " )\n", " backend = service.backend(backend_name)\n", " print(\n", " f\"[main] Runtime service initialized from env and backend \"\n", " f\"resolved (name={backend.name})\"\n", " )\n", " except QiskitBackendNotFoundError as e:\n", " raise ValueError(f\"The backend named {backend_name} couldn't be found.\") from e\n", " except Exception as e:\n", " raise ValueError(\n", " f\"`QiskitRuntimeService` couldn't be initialized with os environment variables: {e}.\"\n", " ) from e\n", "\n", "\n", "# ----- launch parallel tasks -----\n", "\n", "# get task references (async, parallel on the serverless cluster)\n", "print(f\"[main] Launching distributed transpilation tasks (count={len(circuits)})...\")\n", "# sending circuit indexing for\n", "sample_task_references = [\n", " distributed_transpilation(idx, circuit, backend) for idx, circuit in enumerate(circuits)\n", "]\n", "\n", "# ----- collect ISA circuits -----\n", "# collect all results (blocks until all tasks complete)\n", "print(\"[main] Waiting for transpilation tasks to finish...\")\n", "isa_circuits = get(sample_task_references)\n", "print(f\"[main] All transpilation tasks completed (isa_count={len(isa_circuits)})\")\n", "\n", "# ----- batch execute on the quantum computer -----\n", "print(f\"[main] Executing circuits on backend (name={backend.name})...\")\n", "pub_results = Sampler(backend).run(isa_circuits).result()\n", "print(\"[main] Circuit execution completed\")\n", "\n", "print(\"[main] Extracting counts from results...\")\n", "results = [r.data.meas.get_counts() for r in pub_results]\n", "\n", "# ----- persist results -----\n", "# persist results so `job.result()` returns them\n", "save_result({\"results\": results})\n", "print(\n", " \"[main] Results saved (len(results) = \"\n", " f\"{len(results)}; example_keys={list(results[0].keys()) if results else '[]'})\"\n", ")\n", "\n", "\n", "```\n", "\n", "\n", "There are several new concepts introduced in this Qiskit Function, but don’t worry—its core functionality is very similar to what you saw in [Tutorial 02](./02_arguments_and_results.ipynb). The main difference is that instead of running a single circuit, we now accept a list of circuits and use the [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) decorator to transpile each one in parallel before executing them. So while the code may look more complex, the idea is the same: prepare a circuit, run it on a backend, collect the results and save them.\n", "\n", "In the function above, the [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) decorator plays a key role:\n", "\n", "- It converts a regular Python function into a distributed task. This means each call to the function will be executed asynchronously on separate compute resources, allowing us to run multiple tasks in parallel.\n", "- When you call a distributed task, it returns a reference to the execution—not the result itself. To retrieve the actual result, you use the [`get()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.get.html) method, which waits for the task to complete and then returns its output. The [`get(...)`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.get.html) method accept either a single `ObjectRef` or a sequence of them.\n", "\n", "\n", "In the function above, we apply [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) to our `distributed_transpilation(...)` method, so each call runs in parallel as each is an independent task.\n", "\n", "Once you understand how [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) works, the rest of the function becomes easy to follow:\n", "\n", "- We read the list of circuits from the function arguments using [`get_arguments()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.serializers.get_arguments.html#qiskit_serverless.serializers.get_arguments), then call `distributed_transpilation(...)` for each one. This creates a list of task references.\n", "- Each task receives a QuantumCircuit, a backend, and the index of the circuit (used for logging). It transpiles the circuit and returns an ISA circuit targeted to the selected backend.\n", "- These tasks run in parallel across the serverless compute environment. Once all are launched, we call [`get(...)`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.get.html) on the list of references to collect the results.\n", "- After all transpilation tasks finish, we send the entire list of ISA circuits to the quantum computer in one batch via Sampler(backend).run(isa_circuits).\n", "- Finally, we extract the counts from the sampler results and save them using [`save_resul()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.save_result.html#qiskit_serverless.core.save_result), so they can be retrieved later via `job.result()`.\n", "\n", "\n", "So while this function does the same overall flow—prepareruncollectsave—that you’ve already seen, the use of [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) makes it easy to scale up and run workloads in parallel.\n", "This approach avoids queue contention from many small jobs: you get the scalability of parallel transpilation with the efficiency of one batched hardware execution.\n", "\n", "> ⚠ By default, each distributed task is allocated 1 CPU.\n", "For advanced resource allocation (such as requesting GPUs or custom memory limits), refer to the section titled “Advanced Parallel Workflow” later in this tutorial." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deploying the function\n", "As in previous tutorials, we will now upload our function and run it.\n", "\n", "> ⚠ This provider is set up with default credentials to a test cluster intended to run on your machine. For information on setting up infrastructure on your local machine, check out the guide on [local infrastructure setup](https://qiskit.github.io/qiskit-serverless/deployment/local.html)." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os\n", "from qiskit_serverless import ServerlessClient\n", "from qiskit_serverless import QiskitFunction\n", "\n", "client = ServerlessClient(\n", " token=os.environ.get(\"GATEWAY_TOKEN\", \"awesome_token\"),\n", " instance=os.environ.get(\"GATEWAY_INSTANCE\", \"an_awesome_crn\"),\n", " host=os.environ.get(\"GATEWAY_HOST\", \"http://localhost:8000\"),\n", " # If you are using the kubernetes approach the URL must be http://localhost\n", ")\n", "\n", "function = QiskitFunction(\n", " title=\"function-with-parallel-workflow\",\n", " entrypoint=\"function_with_parallel_workflow.py\",\n", " working_dir=\"./source_files/\",\n", ")\n", "\n", "client.upload(function)\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Running the QiskitFunction\n", "\n", "### Testing environment\n", "After deploying `function-with-parallel-workflow`, let’s test it. For quick, credential‑free testing we’ll use a simulator. In this example we choose `FakeVigoV2` and pass its name as the `backend_name`. (The function detects “fake” in the name and constructs a fake provider internally.)\n", "As for circuits, lets create a list of random circuits which we will be passed as arguments to the function." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "( ┌──────────────────────────┐┌──────────────┐ ░ ┌─┐ \n", " q_0: ┤ U3(4.5673,3.2818,2.9906) ├┤0 ├─░─┤M├───\n", " └────────────┬─────────────┘│ Rxx(4.6353) │ ░ └╥┘┌─┐\n", " q_1: ─────────────■──────────────┤1 ├─░──╫─┤M├\n", " └──────────────┘ ░ ║ └╥┘\n", " meas: 2/════════════════════════════════════════════════╩══╩═\n", " 0 1 ,\n", " [,\n", " ,\n", " ])" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from qiskit import QuantumCircuit\n", "from qiskit.circuit.random import random_circuit\n", "from qiskit_ibm_runtime.fake_provider import FakeVigoV2\n", "\n", "backend_name = FakeVigoV2().name # e.g., \"fake_vigo\"\n", "\n", "circuits = [random_circuit(2, 2) for _ in range(3)]\n", "[circuit.measure_all() for circuit in circuits]\n", "circuits[0].draw(), circuits" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the function on the simulator backend:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "parallel_function = client.get(\"function-with-parallel-workflow\")\n", "# For fake backends, passing `service` is not required.\n", "job = parallel_function.run(circuits=circuits, backend_name=backend_name)\n", "job" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'QUEUED'" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "job.status()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'results': [{'11': 541, '00': 483}, {'00': 1024}, {'01': 542, '00': 482}]}" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "job.result() # Returns {'quasi_dists': {...}}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the logs:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2025-11-12 12:35:53,422\tINFO job_manager.py:568 -- Runtime env is setting up.\n", "[main] Parsing arguments...\n", "[main] Inputs received (num_circuits=3, backend_name=fake_vigo)\n", "[main] Using fake provider (auto-selected because backend_name contains 'fake').\n", "[main] Backend resolved (name=fake_vigo)\n", "[main] Launching distributed transpilation tasks (count=3)...\n", "2025-11-12 12:36:01,229\tINFO worker.py:1692 -- Using address 172.18.0.3:6379 set in the environment variable RAY_ADDRESS\n", "2025-11-12 12:36:01,237\tINFO worker.py:1833 -- Connecting to existing Ray cluster at address: 172.18.0.3:6379...\n", "2025-11-12 12:36:01,356\tINFO worker.py:2004 -- Connected to Ray cluster. View the dashboard at \u001b[1m\u001b[32mhttp://172.18.0.3:8265 \u001b[39m\u001b[22m\n", "/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py:2052: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0\n", " warnings.warn(\n", ": MADV_DONTNEED does not work (memset will be used instead)\n", ": (This is the expected behaviour if you are running under QEMU)\n", "[main] Waiting for transpilation tasks to finish...\n", "\u001b[36m(distributed_transpilation pid=1391)\u001b[0m : MADV_DONTNEED does not work (memset will be used instead)\n", "\u001b[36m(distributed_transpilation pid=1391)\u001b[0m : (This is the expected behaviour if you are running under QEMU)\n", "\u001b[36m(distributed_transpilation pid=1391)\u001b[0m [distributed_transpilation] Start (index=1, qubits=2, clbits=2, backend=fake_vigo)\n", "\u001b[36m(distributed_transpilation pid=1391)\u001b[0m [distributed_transpilation] Transpilation complete (index=1)\n", "[main] All transpilation tasks completed (isa_count=3)\n", "[main] Executing circuits on backend (name=fake_vigo)...\n", "[main] Circuit execution completed\n", "[main] Extracting counts from results...\n", "[main] Results saved (len(results) = 3; example_keys=['11', '00'])\n", "\u001b[36m(distributed_transpilation pid=1388)\u001b[0m [distributed_transpilation] Start (index=0, qubits=2, clbits=2, backend=fake_vigo)\u001b[32m [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)\u001b[0m\n", "\u001b[36m(distributed_transpilation pid=1388)\u001b[0m [distributed_transpilation] Transpilation complete (index=0)\u001b[32m [repeated 2x across cluster]\u001b[0m\n", "\u001b[36m(distributed_transpilation pid=1388)\u001b[0m : MADV_DONTNEED does not work (memset will be used instead)\u001b[32m [repeated 2x across cluster]\u001b[0m\n", "\u001b[36m(distributed_transpilation pid=1388)\u001b[0m : (This is the expected behaviour if you are running under QEMU)\u001b[32m [repeated 2x across cluster]\u001b[0m\n", "\n" ] } ], "source": [ "print(job.logs())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Advance parallel workflow\n", "By defulat, the [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) decorator allocate `1` cpu for a task, but it can changed upon request. The [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) decorator can accept a [`Target`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html) object to specify resource allocations for each distributed task.\n", "If your `QiskitFunction` performs resource-intensive operations—such as GPU-accelerated transpilation or memory-heavy simulations—you can define the required resources by passing either:\n", " - a [`Target`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html) object or\n", " - a dictionary that can intiate a [`Target`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html) by using [`Target.from_dict(**kwargs)`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html).\n", "\n", "For example, if your transpilation logic benefits from GPU acceleration, you can annotate your task like this:\n", "```python\n", "@distribute_task(target={\"cpu\": 1, \"gpu\": 1})\n", "def distributed_sample(...):\n", " ...\n", "```\n", "This tells the serverless scheduler to allocate one CPU and one GPU for each task execution.\n", "For more details on how to define and use resource targets, refer to the [`Target` documantation](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Running on a real backend (IBM Quantum Platform)\n", "To run on real hardware (or managed cloud simulators), create a [`QiskitRuntimeService`](https://quantum.cloud.ibm.com/docs/en/guides/initialize-account) instance and pass it to the function alongside a real backend_name. You can either use previously saved credentials or supply them directly (uncomment as needed). Then select a backend—for example, the least busy operational device—and pass its name." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from qiskit_ibm_runtime import QiskitRuntimeService\n", "\n", "# If you have saved your account, this can be empty:\n", "service = QiskitRuntimeService(\n", " # channel=\"ibm_quantum_platform\",\n", " # token=\"API_KEY\",\n", " # instance=\"CRN\",\n", ")\n", "\n", "backend = service.least_busy(simulator=False, operational=True)\n", "backend_name = backend.name # use the string name expected by the function\n", "\n", "job = my_function.run(circuit=circuit, backend_name=backend_name, service=service)\n", "job" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Retrieve results and logs:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f\"Results: \\n {job.result()}\")\n", "print(f\"Logs: \\n {job.logs()})" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.7" } }, "nbformat": 4, "nbformat_minor": 4 }