Parallel workflows with QiskitFunction¶
In this tutorial, we’ll explore how to run quantum workloads in parallel using Qiskit Serverless. The function defines a distributed task that transpiles and samples a single circuit on a chosen backend; then the main body launches one task per circuit and gathers the results.
Instead of executing a single circuit like in Tutorial 02, we’ll pass a list of circuits and use the `@distribute_task() <https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task>`__ decorator to transpile each one concurrently across distributed compute resources. Then, we will send them in one batch for execution.
Prep work¶
To utilize prallelism, the function we will write needs to have a method that can run independently from the main thread. Transpilation of a circuit can run independently from other tanspilations so we will make it our parallel task.
The function accepts:
A list of circuits to run.
A backend name (either a simulator or a real device).
An optional service object for accessing IBM Quantum backends.
The function output:
Each circuit result’s counts.
Write the function¶
Let’s take a look at the function file ./source_files/function_with_parallel_workflow.py.
"""function with parallel workflow for jupyter notebook."""
import os
from qiskit import QuantumCircuit
from qiskit.providers import BackendV2
from qiskit.providers.exceptions import QiskitBackendNotFoundError
from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_ibm_runtime.fake_provider import FakeProviderForBackendV2
from qiskit_ibm_runtime import SamplerV2 as Sampler
from qiskit_serverless import get_arguments, save_result, distribute_task, get
@distribute_task()
def distributed_transpilation(
circuit_idx: int, circuit: QuantumCircuit, target_backend: BackendV2
):
"""Distributed task that returns an ISA circuit ready for execution."""
print(
f"[distributed_transpilation] Start (index={circuit_idx}, "
f"qubits={circuit.num_qubits}, clbits={circuit.num_clbits}, backend={target_backend.name})"
)
pm = generate_preset_pass_manager(backend=target_backend, optimization_level=3)
isa_circuit = pm.run(circuit)
print("[distributed_transpilation] Transpilation complete (optimization_level=3)")
return isa_circuit
# ----- parse inputs -----
# get all arguments passed to this function
print("[main] Parsing arguments...")
arguments = get_arguments()
# Extract inputs we care about
circuits = arguments.get("circuits")
backend_name = arguments.get("backend_name")
service = arguments.get("service")
# Normalize inputs
if circuits is None:
raise ValueError(
"`circuits` is required and must be a QuantumCircuit or a list of them."
)
if not isinstance(circuits, list):
circuits = [circuits]
# Basic validation
if not all(isinstance(circuit, QuantumCircuit) for circuit in circuits):
raise ValueError("`circuits` must be a list of qiskit.QuantumCircuit objects.")
if not isinstance(backend_name, str) or len(backend_name) == 0:
raise ValueError("backend_name must be a non-empty string.")
print(
f"[main] Inputs received (num_circuits={len(circuits)}, backend_name={backend_name})"
)
# ----- resolve provider / backend -----
# Choose a provider: fake provider for local testing, or a real servic
if "fake" in backend_name.lower():
print(
"[main] Using fake provider (auto-selected because backend_name contains 'fake')."
)
service = FakeProviderForBackendV2()
if isinstance(service, (FakeProviderForBackendV2, QiskitRuntimeService)):
try:
backend = service.backend(backend_name)
print(f"[main] Backend resolved (name={backend.name})")
except QiskitBackendNotFoundError as e:
raise ValueError(f"Error retrieving backend {backend_name}: {e}") from e
else:
# Fallback: build a Runtime service from environment variables
print(
"[main] No service provided and backend not fake; "
"attempting to initialize QiskitRuntimeService from environment variables..."
)
try:
service = QiskitRuntimeService(
channel=os.environ.get("QISKIT_IBM_CHANNEL"),
token=os.environ.get("QISKIT_IBM_TOKEN"),
instance=os.environ.get("QISKIT_IBM_INSTANCE"),
url=os.environ.get("QISKIT_IBM_URL"),
)
backend = service.backend(backend_name)
print(
f"[main] Runtime service initialized from env and backend "
f"resolved (name={backend.name})"
)
except QiskitBackendNotFoundError as e:
raise ValueError(f"The backend named {backend_name} couldn't be found.") from e
except Exception as e:
raise ValueError(
f"`QiskitRuntimeService` couldn't be initialized with os environment variables: {e}."
) from e
# ----- launch parallel tasks -----
# get task references (async, parallel on the serverless cluster)
print(f"[main] Launching distributed transpilation tasks (count={len(circuits)})...")
# sending circuit indexing for
sample_task_references = [
distributed_transpilation(idx, circuit, backend) for idx, circuit in enumerate(circuits)
]
# ----- collect ISA circuits -----
# collect all results (blocks until all tasks complete)
print("[main] Waiting for transpilation tasks to finish...")
isa_circuits = get(sample_task_references)
print(f"[main] All transpilation tasks completed (isa_count={len(isa_circuits)})")
# ----- batch execute on the quantum computer -----
print(f"[main] Executing circuits on backend (name={backend.name})...")
pub_results = Sampler(backend).run(isa_circuits).result()
print("[main] Circuit execution completed")
print("[main] Extracting counts from results...")
results = [r.data.meas.get_counts() for r in pub_results]
# ----- persist results -----
# persist results so `job.result()` returns them
save_result({"results": results})
print(
"[main] Results saved (len(results) = "
f"{len(results)}; example_keys={list(results[0].keys()) if results else '[]'})"
)
There are several new concepts introduced in this Qiskit Function, but don’t worry—its core functionality is very similar to what you saw in Tutorial 02. The main difference is that instead of running a single circuit, we now accept a list of circuits and use the `@distribute_task() <https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task>`__ decorator to transpile each one in
parallel before executing them. So while the code may look more complex, the idea is the same: prepare a circuit, run it on a backend, collect the results and save them.
In the function above, the `@distribute_task() <https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task>`__ decorator plays a key role:
It converts a regular Python function into a distributed task. This means each call to the function will be executed asynchronously on separate compute resources, allowing us to run multiple tasks in parallel.
When you call a distributed task, it returns a reference to the execution—not the result itself. To retrieve the actual result, you use the
`get()<https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.get.html>`__ method, which waits for the task to complete and then returns its output. The`get(...)<https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.get.html>`__ method accept either a singleObjectRefor a sequence of them.
In the function above, we apply `@distribute_task() <https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task>`__ to our distributed_transpilation(...) method, so each call runs in parallel as each is an independent task.
Once you understand how `@distribute_task() <https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task>`__ works, the rest of the function becomes easy to follow:
We read the list of circuits from the function arguments using
`get_arguments()<https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.serializers.get_arguments.html#qiskit_serverless.serializers.get_arguments>`__, then calldistributed_transpilation(...)for each one. This creates a list of task references.Each task receives a QuantumCircuit, a backend, and the index of the circuit (used for logging). It transpiles the circuit and returns an ISA circuit targeted to the selected backend.
These tasks run in parallel across the serverless compute environment. Once all are launched, we call
`get(...)<https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.get.html>`__ on the list of references to collect the results.After all transpilation tasks finish, we send the entire list of ISA circuits to the quantum computer in one batch via Sampler(backend).run(isa_circuits).
Finally, we extract the counts from the sampler results and save them using
`save_resul()<https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.save_result.html#qiskit_serverless.core.save_result>`__, so they can be retrieved later viajob.result().
So while this function does the same overall flow—prepare → run → collect → save—that you’ve already seen, the use of `@distribute_task() <https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task>`__ makes it easy to scale up and run workloads in parallel. This approach avoids queue contention from many small jobs: you get the scalability of parallel transpilation with the efficiency of one batched hardware execution.
⚠ By default, each distributed task is allocated 1 CPU. For advanced resource allocation (such as requesting GPUs or custom memory limits), refer to the section titled “Advanced Parallel Workflow” later in this tutorial.
Deploying the function¶
As in previous tutorials, we will now upload our function and run it.
⚠ This provider is set up with default credentials to a test cluster intended to run on your machine. For information on setting up infrastructure on your local machine, check out the guide on local infrastructure setup.
[1]:
import os
from qiskit_serverless import ServerlessClient
from qiskit_serverless import QiskitFunction
client = ServerlessClient(
token=os.environ.get("GATEWAY_TOKEN", "awesome_token"),
instance=os.environ.get("GATEWAY_INSTANCE", "an_awesome_crn"),
host=os.environ.get("GATEWAY_HOST", "http://localhost:8000"),
# If you are using the kubernetes approach the URL must be http://localhost
)
function = QiskitFunction(
title="function-with-parallel-workflow",
entrypoint="function_with_parallel_workflow.py",
working_dir="./source_files/",
)
client.upload(function)
client
[1]:
<gateway-client>
Running the QiskitFunction¶
Testing environment¶
After deploying function-with-parallel-workflow, let’s test it. For quick, credential‑free testing we’ll use a simulator. In this example we choose FakeVigoV2 and pass its name as the backend_name. (The function detects “fake” in the name and constructs a fake provider internally.) As for circuits, lets create a list of random circuits which we will be passed as arguments to the function.
[2]:
from qiskit import QuantumCircuit
from qiskit.circuit.random import random_circuit
from qiskit_ibm_runtime.fake_provider import FakeVigoV2
backend_name = FakeVigoV2().name # e.g., "fake_vigo"
circuits = [random_circuit(2, 2) for _ in range(3)]
[circuit.measure_all() for circuit in circuits]
circuits[0].draw(), circuits
[2]:
( ┌──────────────────────────┐┌──────────────┐ ░ ┌─┐
q_0: ┤ U3(4.5673,3.2818,2.9906) ├┤0 ├─░─┤M├───
└────────────┬─────────────┘│ Rxx(4.6353) │ ░ └╥┘┌─┐
q_1: ─────────────■──────────────┤1 ├─░──╫─┤M├
└──────────────┘ ░ ║ └╥┘
meas: 2/════════════════════════════════════════════════╩══╩═
0 1 ,
[<qiskit.circuit.quantumcircuit.QuantumCircuit at 0x129f26200>,
<qiskit.circuit.quantumcircuit.QuantumCircuit at 0x129f262b0>,
<qiskit.circuit.quantumcircuit.QuantumCircuit at 0x129f26360>])
Run the function on the simulator backend:
[3]:
parallel_function = client.get("function-with-parallel-workflow")
# For fake backends, passing `service` is not required.
job = parallel_function.run(circuits=circuits, backend_name=backend_name)
job
[3]:
<Job | 655d139d-e7e2-42ae-8cb3-e0bce8d6730d>
[4]:
job.status()
[4]:
'QUEUED'
[5]:
job.result() # Returns {'quasi_dists': {...}}
[5]:
{'results': [{'11': 541, '00': 483}, {'00': 1024}, {'01': 542, '00': 482}]}
Check the logs:
[6]:
print(job.logs())
2025-11-12 12:35:53,422 INFO job_manager.py:568 -- Runtime env is setting up.
[main] Parsing arguments...
[main] Inputs received (num_circuits=3, backend_name=fake_vigo)
[main] Using fake provider (auto-selected because backend_name contains 'fake').
[main] Backend resolved (name=fake_vigo)
[main] Launching distributed transpilation tasks (count=3)...
2025-11-12 12:36:01,229 INFO worker.py:1692 -- Using address 172.18.0.3:6379 set in the environment variable RAY_ADDRESS
2025-11-12 12:36:01,237 INFO worker.py:1833 -- Connecting to existing Ray cluster at address: 172.18.0.3:6379...
2025-11-12 12:36:01,356 INFO worker.py:2004 -- Connected to Ray cluster. View the dashboard at http://172.18.0.3:8265
/usr/local/lib64/python3.11/site-packages/ray/_private/worker.py:2052: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
warnings.warn(
<jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
<jemalloc>: (This is the expected behaviour if you are running under QEMU)
[main] Waiting for transpilation tasks to finish...
(distributed_transpilation pid=1391) <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
(distributed_transpilation pid=1391) <jemalloc>: (This is the expected behaviour if you are running under QEMU)
(distributed_transpilation pid=1391) [distributed_transpilation] Start (index=1, qubits=2, clbits=2, backend=fake_vigo)
(distributed_transpilation pid=1391) [distributed_transpilation] Transpilation complete (index=1)
[main] All transpilation tasks completed (isa_count=3)
[main] Executing circuits on backend (name=fake_vigo)...
[main] Circuit execution completed
[main] Extracting counts from results...
[main] Results saved (len(results) = 3; example_keys=['11', '00'])
(distributed_transpilation pid=1388) [distributed_transpilation] Start (index=0, qubits=2, clbits=2, backend=fake_vigo) [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
(distributed_transpilation pid=1388) [distributed_transpilation] Transpilation complete (index=0) [repeated 2x across cluster]
(distributed_transpilation pid=1388) <jemalloc>: MADV_DONTNEED does not work (memset will be used instead) [repeated 2x across cluster]
(distributed_transpilation pid=1388) <jemalloc>: (This is the expected behaviour if you are running under QEMU) [repeated 2x across cluster]
Advance parallel workflow¶
By defulat, the `@distribute_task() <https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task>`__ decorator allocate 1 cpu for a task, but it can changed upon request. The `@distribute_task() <https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.distribute_task.html#qiskit_serverless.core.distribute_task>`__ decorator can accept a
`Target <https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html>`__ object to specify resource allocations for each distributed task. If your QiskitFunction performs resource-intensive operations—such as GPU-accelerated transpilation or memory-heavy simulations—you can define the required resources by passing either:
a
`Target<https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html>`__ object ora dictionary that can intiate a
`Target<https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html>`__ by using`Target.from_dict(**kwargs)<https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html>`__.
For example, if your transpilation logic benefits from GPU acceleration, you can annotate your task like this:
@distribute_task(target={"cpu": 1, "gpu": 1})
def distributed_sample(...):
...
This tells the serverless scheduler to allocate one CPU and one GPU for each task execution. For more details on how to define and use resource targets, refer to the `Target documantation <https://qiskit.github.io/qiskit-serverless/stubs/qiskit_serverless.core.Target.html>`__
Running on a real backend (IBM Quantum Platform)¶
To run on real hardware (or managed cloud simulators), create a `QiskitRuntimeService <https://quantum.cloud.ibm.com/docs/en/guides/initialize-account>`__ instance and pass it to the function alongside a real backend_name. You can either use previously saved credentials or supply them directly (uncomment as needed). Then select a backend—for example, the least busy operational device—and pass its name.
[ ]:
from qiskit_ibm_runtime import QiskitRuntimeService
# If you have saved your account, this can be empty:
service = QiskitRuntimeService(
# channel="ibm_quantum_platform",
# token="API_KEY",
# instance="CRN",
)
backend = service.least_busy(simulator=False, operational=True)
backend_name = backend.name # use the string name expected by the function
job = my_function.run(circuit=circuit, backend_name=backend_name, service=service)
job
Retrieve results and logs:
[ ]:
print(f"Results: \n {job.result()}")
print(f"Logs: \n {job.logs()})
[ ]: