{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Parallel workflows with QiskitFunction\n",
"\n",
"In this tutorial, we’ll explore how to run quantum workloads in parallel using Qiskit Serverless. The function defines a distributed task that transpiles and samples a single circuit on a chosen backend; then the main body launches one task per circuit and gathers the results.\n",
"\n",
"Instead of executing a single circuit like in [Tutorial 02](./02_arguments_and_results.ipynb), we’ll pass a list of circuits and use the [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) decorator to transpile each one concurrently across distributed compute resources. Then, we will send them in one batch for execution.\n",
"\n",
"## Prep work\n",
"To utilize prallelism, the function we will write needs to have a method that can run independently from the main thread. Transpilation of a circuit can run independently from other tanspilations so we will make it our parallel task.\n",
"\n",
"The function accepts:\n",
" - A list of circuits to run.\n",
" - A backend name (either a simulator or a real device).\n",
" - An optional service object for accessing IBM Quantum backends.\n",
"\n",
"The function output:\n",
" - Each circuit result's counts.\n",
"\n",
"\n",
"### Write the function\n",
"\n",
"Let's take a look at the function file [./source_files/function_with_parallel_workflow.py](./source_files/function_with_parallel_workflow.py). \n",
"\n",
"```python\n",
"\"\"\"function with parallel workflow for jupyter notebook.\"\"\"\n",
"import os\n",
"from qiskit import QuantumCircuit\n",
"from qiskit.providers import BackendV2\n",
"from qiskit.providers.exceptions import QiskitBackendNotFoundError\n",
"from qiskit.transpiler import generate_preset_pass_manager\n",
"from qiskit_ibm_runtime import QiskitRuntimeService\n",
"from qiskit_ibm_runtime.fake_provider import FakeProviderForBackendV2\n",
"from qiskit_ibm_runtime import SamplerV2 as Sampler\n",
"from qiskit_serverless import get_arguments, save_result, distribute_task, get\n",
"\n",
"\n",
"@distribute_task()\n",
"def distributed_transpilation(\n",
" circuit_idx: int, circuit: QuantumCircuit, target_backend: BackendV2\n",
"):\n",
" \"\"\"Distributed task that returns an ISA circuit ready for execution.\"\"\"\n",
" print(\n",
" f\"[distributed_transpilation] Start (index={circuit_idx}, \"\n",
" f\"qubits={circuit.num_qubits}, clbits={circuit.num_clbits}, backend={target_backend.name})\"\n",
" )\n",
" pm = generate_preset_pass_manager(backend=target_backend, optimization_level=3)\n",
" isa_circuit = pm.run(circuit)\n",
" print(\"[distributed_transpilation] Transpilation complete (optimization_level=3)\")\n",
" return isa_circuit\n",
"\n",
"\n",
"# ----- parse inputs -----\n",
"# get all arguments passed to this function\n",
"print(\"[main] Parsing arguments...\")\n",
"arguments = get_arguments()\n",
"\n",
"# Extract inputs we care about\n",
"circuits = arguments.get(\"circuits\")\n",
"backend_name = arguments.get(\"backend_name\")\n",
"service = arguments.get(\"service\")\n",
"\n",
"# Normalize inputs\n",
"if circuits is None:\n",
" raise ValueError(\n",
" \"`circuits` is required and must be a QuantumCircuit or a list of them.\"\n",
" )\n",
"if not isinstance(circuits, list):\n",
" circuits = [circuits]\n",
"\n",
"# Basic validation\n",
"if not all(isinstance(circuit, QuantumCircuit) for circuit in circuits):\n",
" raise ValueError(\"`circuits` must be a list of qiskit.QuantumCircuit objects.\")\n",
"if not isinstance(backend_name, str) or len(backend_name) == 0:\n",
" raise ValueError(\"backend_name must be a non-empty string.\")\n",
"\n",
"print(\n",
" f\"[main] Inputs received (num_circuits={len(circuits)}, backend_name={backend_name})\"\n",
")\n",
"\n",
"# ----- resolve provider / backend -----\n",
"# Choose a provider: fake provider for local testing, or a real servic\n",
"if \"fake\" in backend_name.lower():\n",
" print(\n",
" \"[main] Using fake provider (auto-selected because backend_name contains 'fake').\"\n",
" )\n",
" service = FakeProviderForBackendV2()\n",
"\n",
"if isinstance(service, (FakeProviderForBackendV2, QiskitRuntimeService)):\n",
" try:\n",
" backend = service.backend(backend_name)\n",
" print(f\"[main] Backend resolved (name={backend.name})\")\n",
" except QiskitBackendNotFoundError as e:\n",
" raise ValueError(f\"Error retrieving backend {backend_name}: {e}\") from e\n",
"else:\n",
" # Fallback: build a Runtime service from environment variables\n",
" print(\n",
" \"[main] No service provided and backend not fake; \"\n",
" \"attempting to initialize QiskitRuntimeService from environment variables...\"\n",
" )\n",
" try:\n",
" service = QiskitRuntimeService(\n",
" channel=os.environ.get(\"QISKIT_IBM_CHANNEL\"),\n",
" token=os.environ.get(\"QISKIT_IBM_TOKEN\"),\n",
" instance=os.environ.get(\"QISKIT_IBM_INSTANCE\"),\n",
" url=os.environ.get(\"QISKIT_IBM_URL\"),\n",
" )\n",
" backend = service.backend(backend_name)\n",
" print(\n",
" f\"[main] Runtime service initialized from env and backend \"\n",
" f\"resolved (name={backend.name})\"\n",
" )\n",
" except QiskitBackendNotFoundError as e:\n",
" raise ValueError(f\"The backend named {backend_name} couldn't be found.\") from e\n",
" except Exception as e:\n",
" raise ValueError(\n",
" f\"`QiskitRuntimeService` couldn't be initialized with os environment variables: {e}.\"\n",
" ) from e\n",
"\n",
"\n",
"# ----- launch parallel tasks -----\n",
"\n",
"# get task references (async, parallel on the serverless cluster)\n",
"print(f\"[main] Launching distributed transpilation tasks (count={len(circuits)})...\")\n",
"# sending circuit indexing for\n",
"sample_task_references = [\n",
" distributed_transpilation(idx, circuit, backend) for idx, circuit in enumerate(circuits)\n",
"]\n",
"\n",
"# ----- collect ISA circuits -----\n",
"# collect all results (blocks until all tasks complete)\n",
"print(\"[main] Waiting for transpilation tasks to finish...\")\n",
"isa_circuits = get(sample_task_references)\n",
"print(f\"[main] All transpilation tasks completed (isa_count={len(isa_circuits)})\")\n",
"\n",
"# ----- batch execute on the quantum computer -----\n",
"print(f\"[main] Executing circuits on backend (name={backend.name})...\")\n",
"pub_results = Sampler(backend).run(isa_circuits).result()\n",
"print(\"[main] Circuit execution completed\")\n",
"\n",
"print(\"[main] Extracting counts from results...\")\n",
"results = [r.data.meas.get_counts() for r in pub_results]\n",
"\n",
"# ----- persist results -----\n",
"# persist results so `job.result()` returns them\n",
"save_result({\"results\": results})\n",
"print(\n",
" \"[main] Results saved (len(results) = \"\n",
" f\"{len(results)}; example_keys={list(results[0].keys()) if results else '[]'})\"\n",
")\n",
"\n",
"\n",
"```\n",
"\n",
"\n",
"There are several new concepts introduced in this Qiskit Function, but don’t worry—its core functionality is very similar to what you saw in [Tutorial 02](./02_arguments_and_results.ipynb). The main difference is that instead of running a single circuit, we now accept a list of circuits and use the [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) decorator to transpile each one in parallel before executing them. So while the code may look more complex, the idea is the same: prepare a circuit, run it on a backend, collect the results and save them.\n",
"\n",
"In the function above, the [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) decorator plays a key role:\n",
"\n",
"- It converts a regular Python function into a distributed task. This means each call to the function will be executed asynchronously on separate compute resources, allowing us to run multiple tasks in parallel.\n",
"- When you call a distributed task, it returns a reference to the execution—not the result itself. To retrieve the actual result, you use the [`get()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.get.html) method, which waits for the task to complete and then returns its output. The [`get(...)`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.get.html) method accept either a single `ObjectRef` or a sequence of them.\n",
"\n",
"\n",
"In the function above, we apply [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) to our `distributed_transpilation(...)` method, so each call runs in parallel as each is an independent task.\n",
"\n",
"Once you understand how [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) works, the rest of the function becomes easy to follow:\n",
"\n",
"- We read the list of circuits from the function arguments using [`get_arguments()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.serializers.get_arguments.html#qiskit_serverless.serializers.get_arguments), then call `distributed_transpilation(...)` for each one. This creates a list of task references.\n",
"- Each task receives a QuantumCircuit, a backend, and the index of the circuit (used for logging). It transpiles the circuit and returns an ISA circuit targeted to the selected backend.\n",
"- These tasks run in parallel across the serverless compute environment. Once all are launched, we call [`get(...)`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.get.html) on the list of references to collect the results.\n",
"- After all transpilation tasks finish, we send the entire list of ISA circuits to the quantum computer in one batch via Sampler(backend).run(isa_circuits).\n",
"- Finally, we extract the counts from the sampler results and save them using [`save_resul()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.save_result.html#qiskit_serverless.core.save_result), so they can be retrieved later via `job.result()`.\n",
"\n",
"\n",
"So while this function does the same overall flow—prepare → run → collect → save—that you’ve already seen, the use of [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) makes it easy to scale up and run workloads in parallel.\n",
"This approach avoids queue contention from many small jobs: you get the scalability of parallel transpilation with the efficiency of one batched hardware execution.\n",
"\n",
"> ⚠ By default, each distributed task is allocated 1 CPU.\n",
"For advanced resource allocation (such as requesting GPUs or custom memory limits), refer to the section titled “Advanced Parallel Workflow” later in this tutorial."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Deploying the function\n",
"As in previous tutorials, we will now upload our function and run it.\n",
"\n",
"> ⚠ This provider is set up with default credentials to a test cluster intended to run on your machine. For information on setting up infrastructure on your local machine, check out the guide on [local infrastructure setup](https://qiskit.github.io/qiskit-serverless/deployment/local.html)."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import os\n",
"from qiskit_serverless import ServerlessClient\n",
"from qiskit_serverless import QiskitFunction\n",
"\n",
"client = ServerlessClient(\n",
" token=os.environ.get(\"GATEWAY_TOKEN\", \"awesome_token\"),\n",
" instance=os.environ.get(\"GATEWAY_INSTANCE\", \"an_awesome_crn\"),\n",
" host=os.environ.get(\"GATEWAY_HOST\", \"http://localhost:8000\"),\n",
" # If you are using the kubernetes approach the URL must be http://localhost\n",
")\n",
"\n",
"function = QiskitFunction(\n",
" title=\"function-with-parallel-workflow\",\n",
" entrypoint=\"function_with_parallel_workflow.py\",\n",
" working_dir=\"./source_files/\",\n",
")\n",
"\n",
"client.upload(function)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running the QiskitFunction\n",
"\n",
"### Testing environment\n",
"After deploying `function-with-parallel-workflow`, let’s test it. For quick, credential‑free testing we’ll use a simulator. In this example we choose `FakeVigoV2` and pass its name as the `backend_name`. (The function detects “fake” in the name and constructs a fake provider internally.)\n",
"As for circuits, lets create a list of random circuits which we will be passed as arguments to the function."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"( ┌──────────────────────────┐┌──────────────┐ ░ ┌─┐ \n",
" q_0: ┤ U3(4.5673,3.2818,2.9906) ├┤0 ├─░─┤M├───\n",
" └────────────┬─────────────┘│ Rxx(4.6353) │ ░ └╥┘┌─┐\n",
" q_1: ─────────────■──────────────┤1 ├─░──╫─┤M├\n",
" └──────────────┘ ░ ║ └╥┘\n",
" meas: 2/════════════════════════════════════════════════╩══╩═\n",
" 0 1 ,\n",
" [,\n",
" ,\n",
" ])"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from qiskit import QuantumCircuit\n",
"from qiskit.circuit.random import random_circuit\n",
"from qiskit_ibm_runtime.fake_provider import FakeVigoV2\n",
"\n",
"backend_name = FakeVigoV2().name # e.g., \"fake_vigo\"\n",
"\n",
"circuits = [random_circuit(2, 2) for _ in range(3)]\n",
"[circuit.measure_all() for circuit in circuits]\n",
"circuits[0].draw(), circuits"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the function on the simulator backend:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"parallel_function = client.get(\"function-with-parallel-workflow\")\n",
"# For fake backends, passing `service` is not required.\n",
"job = parallel_function.run(circuits=circuits, backend_name=backend_name)\n",
"job"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'QUEUED'"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"job.status()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'results': [{'11': 541, '00': 483}, {'00': 1024}, {'01': 542, '00': 482}]}"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"job.result() # Returns {'quasi_dists': {...}}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Check the logs:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2025-11-12 12:35:53,422\tINFO job_manager.py:568 -- Runtime env is setting up.\n",
"[main] Parsing arguments...\n",
"[main] Inputs received (num_circuits=3, backend_name=fake_vigo)\n",
"[main] Using fake provider (auto-selected because backend_name contains 'fake').\n",
"[main] Backend resolved (name=fake_vigo)\n",
"[main] Launching distributed transpilation tasks (count=3)...\n",
"2025-11-12 12:36:01,229\tINFO worker.py:1692 -- Using address 172.18.0.3:6379 set in the environment variable RAY_ADDRESS\n",
"2025-11-12 12:36:01,237\tINFO worker.py:1833 -- Connecting to existing Ray cluster at address: 172.18.0.3:6379...\n",
"2025-11-12 12:36:01,356\tINFO worker.py:2004 -- Connected to Ray cluster. View the dashboard at \u001b[1m\u001b[32mhttp://172.18.0.3:8265 \u001b[39m\u001b[22m\n",
"/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py:2052: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0\n",
" warnings.warn(\n",
": MADV_DONTNEED does not work (memset will be used instead)\n",
": (This is the expected behaviour if you are running under QEMU)\n",
"[main] Waiting for transpilation tasks to finish...\n",
"\u001b[36m(distributed_transpilation pid=1391)\u001b[0m : MADV_DONTNEED does not work (memset will be used instead)\n",
"\u001b[36m(distributed_transpilation pid=1391)\u001b[0m : (This is the expected behaviour if you are running under QEMU)\n",
"\u001b[36m(distributed_transpilation pid=1391)\u001b[0m [distributed_transpilation] Start (index=1, qubits=2, clbits=2, backend=fake_vigo)\n",
"\u001b[36m(distributed_transpilation pid=1391)\u001b[0m [distributed_transpilation] Transpilation complete (index=1)\n",
"[main] All transpilation tasks completed (isa_count=3)\n",
"[main] Executing circuits on backend (name=fake_vigo)...\n",
"[main] Circuit execution completed\n",
"[main] Extracting counts from results...\n",
"[main] Results saved (len(results) = 3; example_keys=['11', '00'])\n",
"\u001b[36m(distributed_transpilation pid=1388)\u001b[0m [distributed_transpilation] Start (index=0, qubits=2, clbits=2, backend=fake_vigo)\u001b[32m [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)\u001b[0m\n",
"\u001b[36m(distributed_transpilation pid=1388)\u001b[0m [distributed_transpilation] Transpilation complete (index=0)\u001b[32m [repeated 2x across cluster]\u001b[0m\n",
"\u001b[36m(distributed_transpilation pid=1388)\u001b[0m : MADV_DONTNEED does not work (memset will be used instead)\u001b[32m [repeated 2x across cluster]\u001b[0m\n",
"\u001b[36m(distributed_transpilation pid=1388)\u001b[0m : (This is the expected behaviour if you are running under QEMU)\u001b[32m [repeated 2x across cluster]\u001b[0m\n",
"\n"
]
}
],
"source": [
"print(job.logs())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Advance parallel workflow\n",
"By defulat, the [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) decorator allocate `1` cpu for a task, but it can changed upon request. The [`@distribute_task()`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task) decorator can accept a [`Target`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html) object to specify resource allocations for each distributed task.\n",
"If your `QiskitFunction` performs resource-intensive operations—such as GPU-accelerated transpilation or memory-heavy simulations—you can define the required resources by passing either:\n",
" - a [`Target`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html) object or\n",
" - a dictionary that can intiate a [`Target`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html) by using [`Target.from_dict(**kwargs)`](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html).\n",
"\n",
"For example, if your transpilation logic benefits from GPU acceleration, you can annotate your task like this:\n",
"```python\n",
"@distribute_task(target={\"cpu\": 1, \"gpu\": 1})\n",
"def distributed_sample(...):\n",
" ...\n",
"```\n",
"This tells the serverless scheduler to allocate one CPU and one GPU for each task execution.\n",
"For more details on how to define and use resource targets, refer to the [`Target` documantation](https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Running on a real backend (IBM Quantum Platform)\n",
"To run on real hardware (or managed cloud simulators), create a [`QiskitRuntimeService`](https://quantum.cloud.ibm.com/docs/en/guides/initialize-account) instance and pass it to the function alongside a real backend_name. You can either use previously saved credentials or supply them directly (uncomment as needed). Then select a backend—for example, the least busy operational device—and pass its name."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from qiskit_ibm_runtime import QiskitRuntimeService\n",
"\n",
"# If you have saved your account, this can be empty:\n",
"service = QiskitRuntimeService(\n",
" # channel=\"ibm_quantum_platform\",\n",
" # token=\"API_KEY\",\n",
" # instance=\"CRN\",\n",
")\n",
"\n",
"backend = service.least_busy(simulator=False, operational=True)\n",
"backend_name = backend.name # use the string name expected by the function\n",
"\n",
"job = my_function.run(circuit=circuit, backend_name=backend_name, service=service)\n",
"job"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Retrieve results and logs:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(f\"Results: \\n {job.result()}\")\n",
"print(f\"Logs: \\n {job.logs()})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.7"
}
},
"nbformat": 4,
"nbformat_minor": 4
}