Implicit solvent calculations using Qiskit Serverless
Usage estimate: 2 minutes on a Heron r2 processor (NOTE: This is an estimate only. Your runtime may vary.)
Learning outcomes
After going through this tutorial, users should understand:
- How to configure and run a remote workflow using Qiskit Serverless
- How to compute implicit solvent effects using a quantum computer
Prerequisites
We suggest that users are familiar with the following topic before going through this tutorial:
Background
Implicit solvent calculations are frequently used in computational biophysics. These models describe how a solute compound interacts with a solvent, without modeling the solvent system directly. Instead, an approximation is made wherein the solute system model is wrapped in a mathematical representation of a dielectric medium characterized empirically. This dielectric approximation then interacts with the solute, which is itself modeled directly. The dielectric medium influences characteristics of the solute system, such as its ground state energy, by interacting with its electron field. This is important to biophysical models used, say, in drug discovery, because compounds behave differently in various dielectric environments. Modeling a compound in the air (in vacuo) will describe different behavior than modeling it in water. Since pharmaceutical compounds must enter into the human body, which itself is composed mostly of water, it is helpful to model a compound in a solution like water instead of in vacuo. With implicit solvent models, we can achieve this behavior cheaply, although the end result will generally be more approximate than the counterpart: more computationally costly explicit solvent models that create direct representations of both solute and solvent molecules.
In this tutorial, we demonstrate how a quantum algorithm, Sample-based Quantum Diagonalization (SQD), can be worked into a relatively computationally cheap implicit solvent model. In the example, we describe how methylamine behaves when it dissolves in water. We compare the quantum algorithm to a classical state-of-the-art comparison method called CASCI and demonstrate close agreement between these computations. We showcase a quantum-centric supercomputing architecture in miniature, offloading the computationally expensive classical post-processing of the quantum sampling portion of the routine into a cloud-based environment within Qiskit Serverless. The code also demonstrates parallelization across the remotely available CPU cores to improve computation time.
Qiskit Serverless is a framework for running distributed quantum and classical workloads without managing infrastructure. There is no server provisioning (no spinning up EC2s, clusters, Docker containers), no orchestration tools (Kubernetes, Docker Swarm) and no monitoring/maintenance. Each Serverless job runs in a clean container, executes your code, and then shuts down. There is no memory between jobs. You simply write your code then submit your job. Within a Serverless job, a program can seamlessly access IBM Quantum® backends and classically post-process results therein. With Qiskit Serverless, users can access always-on remote CPU cores and memory, which provides for the distribution of certain classical workloads across remote resources. Users also gain some advantage in parallel processing of programs while avoiding common headaches from device shutdown mid-execution. For more information on Qiskit Serverless, see its documentation, as well as additional material on GitHub.
This tutorial shows a relevant application of the following:
- Sample-based quantum diagonalization
- Client-server computational models for quantum computing
This tutorial is inspired by and based upon research conducted at Cleveland Clinic, as described in Kaliakin, Danil, et al. "Implicit solvent sample-based quantum diagonalization." The Journal of Physical Chemistry B 129.23 (2025): 5788-5796, exposing the full workflow for implicit solvent calculations and extending it with iterative solvent self-consistency ("The Heartwood Algorithm", M. Motta, T. Pellegrini, 2025), geometry optimization, and automatic qubit layout selection. Please refer to the SQD IEF-PCM Qiskit Function template, jointly developed by Cleveland Clinic and IBM® based on Cleveland Clinic research, for a streamlined, black-box interface for running implicit solvent calculations.
Requirements
Before starting this tutorial, be sure you have the following installed:
- Qiskit SDK v2.0 or later with visualization support
- Qiskit Runtime v0.40 or later (
pip install qiskit-ibm-runtime) - Qiskit SDK v2.0 or later with visualization support
pip install qiskit[visualization] - Qiskit Runtime v0.40 or later
pip install qiskit_ibm_runtime - Qiskit IBM Catalog
pip install qiskit_ibm_catalog - Qiskit IBM Serverless
pip install qiskit_serverless - Qiskit addon: Sample-based quantum diagonalization (SQD) v0.12.0
pip install qiskit_addon_sqd - PySCF
pip install pyscf - FFSIM
pip install ffsim - Matplotlib
pip install matplotlib - Geometric
pip install geometric
Setup
# Establish Quantum Resource connection
from qiskit_ibm_runtime import QiskitRuntimeService
service = QiskitRuntimeService()
backend = service.least_busy()
print(f"Using backend {backend.name}")# Establish Classical HPC Resource connection
from qiskit_ibm_catalog import QiskitFunction, QiskitServerless
client = QiskitServerless()Create a directory exactly next to the main notebook program called source_files. You will put Python files into this directory that you intend to share with the remote compute environment. You must create two files:
source_files\diagonalization_engine.pysource_files\classical_simulation.py
Click to expand the text for each script below, then copy and paste the content into a local file with these path names.
#!/usr/bin/env python3 import numpy as np from json.encoder import JSONEncoder from json.decoder import JSONDecoder from functools import partial import os from qiskit_ibm_runtime import QiskitRuntimeService from qiskit_serverless import distribute_task, get_arguments, get, save_result from qiskit_addon_sqd.fermion import ( SCIResult, diagonalize_fermionic_hamiltonian, solve_sci, ) ### Argument retrieval args = get_arguments() data = args["data"] # Chemistry Data energy_tol = args["energy_tol"] # SQD option occupancies_tol = args["occupancies_tol"] # SQD option max_iterations = args["max_iterations"] # SQD option symmetrize_spin = args["symmetrize_spin"] # Eigenstate solver option carryover_threshold = args["carryover_threshold"] # Eigenstate solver option num_batches = args["num_batches"] # Eigenstate solver option samples_per_batch = args["samples_per_batch"] # Eigenstate solver option max_cycle = args["max_cycle"] # Eigenstate solver option mem = args["mem"] # Memory per Worker # --- fan‑out target: 1 CPU + mem GB RAM per call ------------- @distribute_task(target={"cpu": 1, "mem": mem * 1024**3}) def _solve_sci_worker( ix, ci_strs, one_body_tensor, two_body_tensor, norb, nelec, spin_sq ): print(f">>>>> WORKER {ix} INITIATED") res = solve_sci( ci_strs, one_body_tensor, two_body_tensor, norb=norb, nelec=nelec, spin_sq=spin_sq, ) print(f">>>>> WORKER {ix} COMPLETE") return res def distribute_solve_sci_batch( ci_strings: list[tuple[np.ndarray, np.ndarray]], one_body_tensor: np.ndarray, two_body_tensor: np.ndarray, norb: int, nelec: tuple[int, int], *, spin_sq: float | None = None, **kwargs, ) -> list[SCIResult]: """Diagonalize Hamiltonian in subspaces, parallelizing across vCPUs in the Serverless environment. Args: ci_strings: List of pairs (strings_a, strings_b) of arrays of spin-alpha CI strings and spin-beta CI strings whose Cartesian product gives the basis of the subspace in which to perform a diagonalization. one_body_tensor: The one-body tensor of the Hamiltonian. two_body_tensor: The two-body tensor of the Hamiltonian. norb: The number of spatial orbitals. nelec: The numbers of alpha and beta electrons. spin_sq: Target value for the total spin squared for the ground state. If ``None``, no spin will be imposed. **kwargs: Keyword arguments to pass to `pyscf.fci.selected_ci.kernel_fixed_space` (https://pyscf.org/pyscf_api_docs/pyscf.fci.html#pyscf.fci.selected_ci.kernel_fixed_space Returns: The results of the diagonalizations in the subspaces given by ci_strings. """ inputs = [ (ix, ci_strs, one_body_tensor, two_body_tensor, norb, nelec, spin_sq) for ix, ci_strs in enumerate(ci_strings) ] # fan‑out: spawn one worker per input tuple print(">>>>> ENTERING WORKER FAN-OUT") refs = [_solve_sci_worker(*input_) for input_ in inputs] print(">>>>> WAITING ON WORKERS TO FINISH TASKS") # fan‑in: block until every worker finishes results = get(refs) print(">>>>> DISTRIBUTED JOBS COMPLETED") return results # A caveat of executing a Python program remotely is # that the inputs to the remote program must be passed # over an internet network. Similarly, the outputs # must be passed back to the local program via the same # structure. Python objects are not always able to be # passed over a network, and must be encoded in a # JSON serializable format. i_data = JSONDecoder().decode(data) # i_data has all of the information needed from the # local program to pick up where the computation left off # after its submission to the remote environment. [ job_id, hcore, eri, num_orbitals, nuclear_repulsion_energy, num_elec_a, num_elec_b, ] = i_data # Re-convert data back into numpy format, after serialization hcore = np.array(hcore) eri = np.array(eri) nuclear_repulsion_energy = np.float64(nuclear_repulsion_energy) # Instantiate Runtime Service to retrieve the # bitstrings from the QPU job. We provided these # credentials upon Serverless setup. service = QiskitRuntimeService( channel=os.environ.get("QISKIT_IBM_CHANNEL"), token=os.environ.get("QISKIT_IBM_TOKEN"), instance=os.environ.get("QISKIT_IBM_INSTANCE"), ) # retrieving the QPU job data from the Serverless side job = service.job(job_id) primitive_result = job.result() pub_result = primitive_result[0] bit_array = pub_result.data.meas # Getting the bitstrings # Pass options to the built-in eigensolver sci_solver = partial( distribute_solve_sci_batch, spin_sq=0.0, max_cycle=max_cycle ) # List to capture intermediate results result_history = [] def callback(results: list[SCIResult]): result_history.append(results) iteration = len(result_history) print(f">>>>> SQD ITERATION {iteration}") for i, result in enumerate(results): print(f">>>>> SUBSAMPLE {i}") print(f">>>>> \tENERGY: {result.energy + nuclear_repulsion_energy}") print( f">>>>> \tSUBSPACE DIMENSION: {np.prod(result.sci_state.amplitudes.shape)}" ) result = diagonalize_fermionic_hamiltonian( hcore, eri, bit_array, samples_per_batch=samples_per_batch, norb=num_orbitals, nelec=(num_elec_a, num_elec_b), num_batches=num_batches, energy_tol=energy_tol, occupancies_tol=occupancies_tol, max_iterations=max_iterations, sci_solver=sci_solver, symmetrize_spin=symmetrize_spin, carryover_threshold=carryover_threshold, callback=callback, seed=12345, ) print(">>>>> EXACT DIAGONALIZATION COMPLETE. CLEANING UP, SERIALIZING DATA.") # Numpy arrays are not JSON serializable. # Convert them to List objects before using the JSONEncoder o_data = JSONEncoder().encode( [ result.energy + nuclear_repulsion_energy, result.energy, result.rdm1.tolist(), result.rdm2.tolist(), [x.tolist() for x in result.orbital_occupancies], [ result.sci_state.nelec, result.sci_state.norb, [x.tolist() for x in result.sci_state.orbital_occupancies()], [x.tolist() for x in result.sci_state.rdm()], ], ] ) # JSON-safe package save_result({"outputs": o_data}) # single JSON blob returned to client#!/usr/bin/env python3 from json.encoder import JSONEncoder from json.decoder import JSONDecoder from qiskit_serverless import get_arguments, save_result import pyscf from pyscf import gto, scf from pyscf.solvent import pcm from pyscf.mcscf import avas import psutil mem_info = ( psutil.virtual_memory() ) # Get information about virtual memory (RAM) total_ram_gb = mem_info.total / (1024**3) # Convert bytes to GB print(f">>>>> SERVERLESS TOTAL RAM: {total_ram_gb:.2f} GB") ### Argument retrieval args = get_arguments() data = args["data"] # Chemistry Data i_data = JSONDecoder().decode(data) [mol_geo, eps, ao_labels] = i_data print(">>>>> DEFINING MOLECULE") mol = gto.M() mol.atom = mol_geo mol.basis = "cc-pVDZ" mol.unit = "Ang" mol.charge = 0 mol.spin = 0 mol.verbose = 0 print(">>>>> BUILDING MOLECULE") mol.build() print(">>>>> DEFINING PCM") cm = pcm.PCM(mol) cm.eps = eps # for water cm.method = "IEF-PCM" print(">>>>> BUILDING RESTRICTED HARTREE FOCK") mf = scf.RHF(mol).PCM(cm) # This is the Final SCF object mf.kernel(verbose=0) print(">>>>> RUNNING AVAS") avas_ = avas.AVAS(mf, ao_labels, with_iao=True, canonicalize=True, verbose=0) avas_.kernel() norb, ne_act, mo_avas = avas_.ncas, avas_.nelecas, avas_.mo_coeff print(">>>>> STARTING CASCI") mc_pcm = pyscf.mcscf.CASCI(mf, norb, ne_act).PCM( cm ) # Make sure to decorate the CASCI object with PCM mc_pcm.mo_coeff = mo_avas # mc_pcm.max_memory = 140000 (CASCI_E, _, _, _, _) = mc_pcm.kernel(verbose=0) print(f">>>>> CASCI_E: {CASCI_E}") o_data = JSONEncoder().encode([float(CASCI_E)]) # JSON-safe package save_result({"outputs": o_data}) # single JSON blob returned to client
For more information, please see the SQD IEF-PCM Qiskit Function template guide (jointly developed by Cleveland Clinic and IBM) referenced above. See also the qiskit_addon_sqd library.
We need to share the program intended to run in the cloud environment, and re-upload it any time we change its source code:
client.upload(
QiskitFunction(
title="diagonalization_engine",
entrypoint="diagonalization_engine.py", # lives in ./source_files
working_dir="source_files",
)
)client.upload(
QiskitFunction(
title="classical_simulation",
entrypoint="classical_simulation.py", # lives in ./source_files
working_dir="source_files",
)
)Small-scale simulator example
This tutorial does not make use of a small-scale simulator because the intent is to demonstrate a scalable quantum application that extends beyond the regime of simulator exploration. Instead, we show later on how this method can be implemented using a classical state-of-the-art comparison method called CASCI.
Large-scale hardware example
# This is a useful helper function that displays
# remote job execution details to the user's local machine
def feedback_serverless(serverless_job):
import time
# Wait for the job to execute
print(f">>>>> Serverless status: {serverless_job.job_id}")
timer = 0
while timer < 10000:
if (
serverless_job.status() == "QUEUED"
or serverless_job.status() == "INITIALIZING"
or serverless_job.status() == "RUNNING"
):
print(f">>>>> [{timer}s] Serverless job {serverless_job.job_id}: \
{serverless_job.status()}")
time.sleep(10)
timer += 10
elif serverless_job.status() == "ERROR":
print(
f">>>>> Serverless job {serverless_job.job_id}: {serverless_job.status()}"
)
print(">>>>> Logs:")
print(serverless_job.logs())
break
elif serverless_job.status() == "DONE":
print(
f">>>>> Serverless job {serverless_job.job_id}: {serverless_job.status()}"
)
break
else:
break
returnStep 1: Map classical inputs to a quantum problem
1.1: Initialize molecule object using known molecular geometry
# Reference guide for building molecule structures:
# https://pyscf.org/user/gto.html
# Video tutorial on building molecular objects in PySCF:
# https://www.youtube.com/watch?v=cNC2cY9E9j0
molecule_name = "Methylamine"
methylamine_geo = """
N -0.7154 0.0000 0.0000;
C 0.7154 0.0000 0.0000;
H 1.1069 0.0916 1.0174;
H 1.0996 0.8349 -0.5930;
H 1.0996 -0.9274 -0.4345;
H -1.0625 0.8564 0.4294;
H -1.0625 -0.7661 0.5753;
"""# Imports
import pyscf
from pyscf import gto # Deals with molecular initialization
from pyscf import scf # Solvation methods
# Explicitly defining the Methylamine molecule
mol = gto.M()
mol.atom = methylamine_geo
mol.basis = "cc-pVDZ"
mol.unit = "Ang"
mol.charge = 0
mol.spin = 0
mol.verbose = 0
mol.build()1.2: Define solvation effects using the Polarizable Continuum Model (PCM)
# You can explore other solvents (such as methanol) by
# retrieving other dielectric parameters from:
# https://gaussian.com/scrf/
from pyscf.solvent import pcm
eps_water = 78.3553 # If solvating in a different medium,
# set this constant appropriately using a known valuecm = pcm.PCM(mol)
cm.eps = eps_water # PySCF defaults to water solvation,
# but here we show this solvation parameter explicitly
cm.method = (
"IEF-PCM" # Alternative solvation models include C-PCM, SS(V)PE, COSMO
)# Create a "Restricted Hartree-Fock" object for the solute,
# then wrap the SCF object with a Polarizable Continuum Model
mf_pcm0 = scf.RHF(mol).PCM(
cm
) # Restricted Hartree-Fock misses instantaneous correlations,
# post-HF methods like CCSD, CI, MP2 might be worth exploring1.3: Geometry optimization using TRIC
# Geometry optimization with geomeTRIC
from pyscf.geomopt.geometric_solver import (
optimize,
) # GeomeTRIC under the hood, for geometry optimization
mol_opt = optimize(
mf_pcm0, tol_grad=3e-4, verbose=0
) # Use geomeTRIC/TRIC under the hood1.4: Prepare continuum model and mean field object with relevant variables
from pyscf.mcscf import avas
# Re-define PCM
cm = pcm.PCM(mol_opt)
cm.eps = eps_water # for water
cm.method = "IEF-PCM"
# Re-build Restricted Hartree Fock object
mf_opt = scf.RHF(mol_opt).PCM(cm)
mf_opt.kernel(verbose=0)
# Run AVAS
ao_labels = ["C 2s", "C 2p", "N 2s", "N 2p", "H 1s"]
avas_ = avas.AVAS(
mf_opt, ao_labels, with_iao=True, canonicalize=True, verbose=0
)
avas_.kernel()
norb, ne_act, mo_avas = avas_.ncas, avas_.nelecas, avas_.mo_coeff
num_elec_a = (ne_act + mol_opt.spin) // 2
num_elec_b = (ne_act - mol_opt.spin) // 2Step 2: Optimize problem for quantum hardware execution
For more information on the helper functions demonstrated here, see the Sample-based quantum diagonalization of a chemistry Hamiltonian tutorial.
# Standard SQD helper functions (From SQD Tutorial)
from typing import Sequence
import rustworkx
from qiskit.providers import BackendV2
from qiskit import QuantumCircuit, QuantumRegister
from rustworkx import NoEdgeBetweenNodes, PyGraph
IBM_TWO_Q_GATES = {"cx", "ecr", "cz"}
def create_linear_chains(num_orbitals: int) -> PyGraph:
"""In zig-zag layout, there are two linear chains (with connecting
qubits between the chains). This function creates those two linear
chains: a rustworkx PyGraph with two disconnected linear chains.
Each chain contains `num_orbitals` number of nodes, that is, in the
final graph there are `2 * num_orbitals` number of nodes.
Args:
num_orbitals (int): Number orbitals or nodes in each linear chain.
They are also known as alpha-alpha interaction qubits.
Returns:
A rustworkx.PyGraph with two disconnected linear chains each with
`num_orbitals` number of nodes.
"""
G = rustworkx.PyGraph()
for n in range(num_orbitals):
G.add_node(n)
for n in range(num_orbitals - 1):
G.add_edge(n, n + 1, None)
for n in range(num_orbitals, 2 * num_orbitals):
G.add_node(n)
for n in range(num_orbitals, 2 * num_orbitals - 1):
G.add_edge(n, n + 1, None)
return G
def create_lucj_zigzag_layout(
num_orbitals: int, backend_coupling_graph: PyGraph
) -> tuple[PyGraph, int]:
"""This function creates the complete zigzag graph that 'can be mapped'
to an IBM QPU with heavy-hex connectivity (the zigzag must be an
isomorphic sub-graph to the QPU/backend coupling graph for it to be
mapped). The zigzag pattern includes both linear chains (alpha-alpha
interactions) and connecting qubits between the linear chains
(alpha-beta interactions).
Args:
num_orbitals (int): Number of orbitals, that is, number of nodes in
each alpha-alpha linear chain.
backend_coupling_graph (PyGraph): The coupling graph of the backend
on which the LUCJ ansatz will be mapped and run. This function takes
the coupling graph as a undirected `rustworkx.PyGraph` where there
is only one 'undirected' edge between two nodes, that is, qubits.
Usually, the coupling graph of a IBM backend is directed (for
example, Eagle devices such as ibm_brisbane) or may have two edges
between two nodes (for example, Heron `ibm_torino`). A user
needs to make such graphs undirected or remove duplicate edges
(or do both) to make them compatible with this function.
Returns:
G_new (PyGraph): The graph with IBM backend compliant zigzag pattern.
num_alpha_beta_qubits (int): Number of connecting qubits between the
linear chains in the zigzag pattern. While we want as many
connecting (alpha-beta) qubits between the linear (alpha-alpha)
chains, we cannot accommodate all due to qubit and connectivity
constraints of backends. This is the maximum number of connecting
qubits the zigzag pattern can have while being backend compliant
(that is, isomorphic to backend coupling graph).
"""
isomorphic = False
G = create_linear_chains(num_orbitals=num_orbitals)
num_iters = num_orbitals
while not isomorphic:
G_new = G.copy()
num_alpha_beta_qubits = 0
for n in range(num_iters):
if n % 4 == 0:
new_node = 2 * num_orbitals + num_alpha_beta_qubits
G_new.add_node(new_node)
G_new.add_edge(n, new_node, None)
G_new.add_edge(new_node, n + num_orbitals, None)
num_alpha_beta_qubits = num_alpha_beta_qubits + 1
isomorphic = rustworkx.is_subgraph_isomorphic(
backend_coupling_graph, G_new
)
num_iters -= 1
return G_new, num_alpha_beta_qubits
def lightweight_layout_error_scoring(
backend: BackendV2,
virtual_edges: Sequence[Sequence[int]],
physical_layouts: Sequence[int],
two_q_gate_name: str,
) -> list[list[list[int], float]]:
"""Lightweight and heuristic function to score isomorphic layouts. There
can be many zigzag patterns, each with different set of physical qubits,
that can be mapped to a backend. Some of them might include fewer noise
qubits and couplings than others. This function computes a simple error
score for each such layout. It sums up 2Q gate error for all couplings
in the zigzag pattern (layout) and measurement of errors of physical
qubits in the layout to compute the error score.
Note:
This lightweight scoring can be refined using concepts such as
mapomatic.
Args:
backend (BackendV2): A backend.
virtual_edges (Sequence[Sequence[int]]): Edges in the device-
compliant zigzag pattern where nodes are numbered from 0 to (2 *
num_orbitals + num_alpha_beta_qubits).
physical_layouts (Sequence[int]): All physical layouts of the zigzag
pattern that are isomorphic to each other and to the larger backend
coupling map.
two_q_gate_name (str): The name of the two-qubit gate of the
backend. The name is used for fetching two-qubit gate error from
backend properties.
Returns:
scores (list): A list of lists where each sublist contains two
items. First item is the layout, and second item is a float
representing error score of the layout. The layouts in the `scores`
are sorted in the ascending order of error score.
"""
props = backend.properties()
scores = []
for layout in physical_layouts:
total_2q_error = 0
for edge in virtual_edges:
physical_edge = (layout[edge[0]], layout[edge[1]])
try:
ge = props.gate_error(two_q_gate_name, physical_edge)
except Exception:
ge = props.gate_error(two_q_gate_name, physical_edge[::-1])
total_2q_error += ge
total_measurement_error = 0
for qubit in layout:
meas_error = props.readout_error(qubit)
total_measurement_error += meas_error
scores.append([layout, total_2q_error + total_measurement_error])
return sorted(scores, key=lambda x: x[1])
def _make_backend_cmap_pygraph(backend: BackendV2) -> PyGraph:
graph = backend.coupling_map.graph
if not graph.is_symmetric():
graph.make_symmetric()
backend_coupling_graph = graph.to_undirected()
edge_list = backend_coupling_graph.edge_list()
removed_edge = []
for edge in edge_list:
if set(edge) in removed_edge:
continue
try:
backend_coupling_graph.remove_edge(edge[0], edge[1])
removed_edge.append(set(edge))
except NoEdgeBetweenNodes:
pass
return backend_coupling_graph
def get_zigzag_physical_layout(
num_orbitals: int, backend: BackendV2, score_layouts: bool = True
) -> tuple[list[int], int]:
"""The main function that generates the zigzag pattern
with physical qubits that can be used as an `intial_layout` in a
preset passmanager/transpiler.
Args:
num_orbitals (int): Number of orbitals.
backend (BackendV2): A backend.
score_layouts (bool): Optional. If `True`, it uses the
`lightweight_layout_error_scoring` function to score the
isomorphic layouts and returns the layout with
fewer erroneous qubits.
If `False`, returns the first isomorphic subgraph.
Returns:
A tuple of device compliant layout (list[int]) with zigzag pattern
and an int representing number of alpha-beta-interactions.
"""
backend_coupling_graph = _make_backend_cmap_pygraph(backend=backend)
G, num_alpha_beta_qubits = create_lucj_zigzag_layout(
num_orbitals=num_orbitals,
backend_coupling_graph=backend_coupling_graph,
)
isomorphic_mappings = rustworkx.vf2_mapping(
backend_coupling_graph, G, subgraph=True
)
isomorphic_mappings = list(isomorphic_mappings)
edges = list(G.edge_list())
layouts = []
for mapping in isomorphic_mappings:
initial_layout = [None] * (2 * num_orbitals + num_alpha_beta_qubits)
for key, value in mapping.items():
initial_layout[value] = key
layouts.append(initial_layout)
two_q_gate_name = IBM_TWO_Q_GATES.intersection(
backend.configuration().basis_gates
).pop()
if score_layouts:
scores = lightweight_layout_error_scoring(
backend=backend,
virtual_edges=edges,
physical_layouts=layouts,
two_q_gate_name=two_q_gate_name,
)
return scores[0][0][:-num_alpha_beta_qubits], num_alpha_beta_qubits
return layouts[0][:-num_alpha_beta_qubits], num_alpha_beta_qubitsfrom qiskit.transpiler import generate_preset_pass_manager
import ffsim
# Initial LUCJ ansatz layout
initial_layout, _ = get_zigzag_physical_layout(norb, backend=backend)
# Initialize a pass manager
pass_manager = generate_preset_pass_manager(
optimization_level=3, backend=backend, initial_layout=initial_layout
)
pass_manager.pre_init = ffsim.qiskit.PRE_INITSteps 3 & 4: Execute using Qiskit and post-process using Qiskit Serverless
Here, we combine Step 3 (Execute) and Step 4 (Post-process) because the application context of the implicit solvent model requires an iterative feedback loop that executes several cycles of execution and post-processing to improve the end computation.
3.1 Compute the restricted Hartree-Fock energy
# Run the kernel to get the RHF energy
mf_opt = scf.RHF(mol_opt).PCM(cm)
hf_e = float(mf_opt.kernel())print(f"Restricted Hartree-Fock Energy: {hf_e}")3.2: Establish classical reference energy with CASCI
# Setup a Serverless Client
worker = client.load("classical_simulation")from json.encoder import JSONEncoder
ao_labels = ["C 2s", "C 2p", "N 2s", "N 2p", "H 1s"]
data_e = JSONEncoder().encode([mol_opt.tostring(), eps_water, ao_labels])serverless_job = worker.run(data=data_e)# Optionally, check the Serverless status feedback
# Don't sit here and stare at the feedback unless debugging.
# You can go develop something else while the Serverless job runs.
_ = feedback_serverless(serverless_job)# If you make a mistake and need to cancel something
# for job in client.jobs():
# job.cancel()from json.decoder import JSONDecoder
CASCI_E = JSONDecoder().decode(serverless_job.result()["outputs"])[0]# We have approximated the red, classical baseline from
# Figure 1 for Methanol (North-West panel)
print(f"CASCI/IEF-PCM(cc-pVDZ): E={CASCI_E}")Configure application parameters
# Systematically vary these parameters to improve hardware results
# Set to "True" to run on real hardware
use_hardware = True
# Error suppression/mitigation options
# >> Configure within Sampler primitive
# Transpiler Options
optimization_level = 3
# Heartwood algorithm options
n_iter = 15 # How many update loops to run
resample = 1 # (resample=1 -> resample the QPU after every
# update loop; resample=n_iter -> sample QPU only once)
shots = 10000
# SQD options
energy_tol = 1e-4
occupancies_tol = 1e-3
max_iterations = 12
# Eigenstate solver options
num_batches = 5
samples_per_batch = 300
symmetrize_spin = True
carryover_threshold = 1e-5
max_cycle = 200
# Classical post-processing options
local = (
False # Remote, Serverless (False) versus Local Post-Processing (True)
)
mem = 16 # Memory allocated to each diagonalization worker (Gb)# Heartwood algorithm subroutines
import os
import numpy as np
import pyscf
from pyscf import ao2mo, cc
from functools import reduce
import ffsim
from json.encoder import JSONEncoder
from json.decoder import JSONDecoder
import time
from functools import partial
from qiskit_addon_sqd.fermion import (
SCIResult,
diagonalize_fermionic_hamiltonian,
solve_sci_batch,
)
def update_rdm(casci_object, dmas):
"""
Inputs:
mc -> CASCI object
dmas -> Spin-summed 1-particle reduced density matrix
This function returns the CASCI/SQD one-body density matrix in
the full basis of atomic orbitals, written as the sum (last line)
of two terms:
- a contribution from the core orbitals,
np.dot(mocore, mocore.conj().T) * 2, (core = inactive and doubly-occupied)
- a contribution from the active-space orbitals and electrons (dmas)
rotated from the active-space to the AO basis (the reduce operation)
Outputs:
rho_approximation: The CASCI/SQD one-body density matrix
in the full basis of atomic orbitals
"""
mo_coeff = casci_object.mo_coeff
ncore = casci_object.ncore
ncas = casci_object.ncas
mocore = mo_coeff[:, :ncore]
mocas = mo_coeff[:, ncore : ncore + ncas]
dm1 = np.dot(mocore, mocore.conj().T) * 2
rho_approximation = dm1 + reduce(np.dot, (mocas, dmas, mocas.conj().T))
return rho_approximation
def run_active_space_calculation(
h1e_cas, h2e_cas, norb, ne_act, orbs, fermilevel, ecore
):
# ----- perform an HF and a CCSD calculation in the active space
from pyscf import tools
from datetime import datetime
now = datetime.now().strftime("%H:%M:%S")
print(">>>>> ACTIVE SPACE CALCULATIONS ")
tools.fcidump.from_integrals(
f"as_fcidump_{now}.txt",
h1e_cas,
h2e_cas,
norb,
ne_act,
ms=0,
nuc=ecore,
) # Forcefully represents the active space in the correct structure
mf_as = tools.fcidump.to_scf(f"as_fcidump_{now}.txt")
os.remove(f"as_fcidump_{now}.txt")
mf_as.kernel()
print(">>>>> RUNNING CCSD")
mf_cc = cc.CCSD(mf_as)
mf_cc.kernel()
orbts = mf_as.mo_coeff
t1, t2 = mf_cc.t1, mf_cc.t2
print(">>>>> UPDATED t1, t2 PARAMETERS")
# ----- update the HF orbitals
active = list(
range(fermilevel - ne_act // 2, fermilevel - ne_act // 2 + norb)
)
orbs[:, active] = np.dot(orbs[:, active], orbts)
return orbs, t1, t2
def get_lucj(norb, num_elec_a, num_elec_b, t1, t2, n_reps=1):
print(">>>>> CONSTRUCTING LUCJ CIRCUIT")
alpha_alpha_indices = [(p, p + 1) for p in range(norb - 1)]
alpha_beta_indices = [(p, p) for p in range(0, norb, 4)]
ucj_op = ffsim.UCJOpSpinBalanced.from_t_amplitudes(
t1=t1, # <---- Update t1 each loop
t2=t2, # <---- Update t2 each loop
n_reps=n_reps,
interaction_pairs=(alpha_alpha_indices, alpha_beta_indices),
)
nelec = (num_elec_a, num_elec_b)
# create an empty quantum circuit
qubits = QuantumRegister(2 * norb, name="q")
circuit = QuantumCircuit(qubits)
# prepare Hartree-Fock state as the reference state
# and append it to the quantum circuit
circuit.append(ffsim.qiskit.PrepareHartreeFockJW(norb, nelec), qubits)
# apply the UCJ operator to the reference state
circuit.append(ffsim.qiskit.UCJOpSpinBalancedJW(ucj_op), qubits)
circuit.measure_all()
return circuit
# Classical diagonalization engine sent to HPC
def classically_diagonalize(
bit_array=None, # Bit string array (only needed if locally processing data)
nuclear_repulsion_energy=None, # Electronic energy from the core orbitals
hcore=None, # 1-electron hamiltonian integrals
eri=None, # 2-electron hamiltonian integrals
num_orbitals=None, # Number of spatial orbitals
nelec=None, # Number of electrons
num_elec_a=None, # Alpha orbitals
num_elec_b=None, # Beta orbitals
job_id=None, # QPU bitstring Job ID
client=None, # Diagonalization engine worker
energy_tol=1e-4, # SQD option
occupancies_tol=1e-3, # SQD option
max_iterations=12, # SQD option
num_batches=8, # Eigenstate solver option
samples_per_batch=300, # Eigenstate solver option
symmetrize_spin=False, # Eigenstate solver option
carryover_threshold=1e-5, # Eigenstate solver option
max_cycle=200, # Eigenstate solver option
local=True, # Remote vs Local Diagonalization
mem=16.0, # Memory per Serverless Worker (Gb)
):
print(">>>>> STARTING DIAGONALIZATION ENGINE ")
# Pass options to the built-in eigensolver. If you just want to use
# the defaults, you can omit this step, in which case you would not
# specify the sci_solver argument in the call to
# diagonalize_fermionic_hamiltonian below.
if local:
sci_solver = partial(
solve_sci_batch, spin_sq=0.0, max_cycle=max_cycle
)
# List to capture intermediate results
result_history = []
def callback(results: list[SCIResult]):
result_history.append(results)
iteration = len(result_history)
print(f">>>>> SQD ITERATION {iteration}")
for i, result in enumerate(results):
print(f">>>>> SUBSAMPLE {i}")
print(
f">>>>> \tENERGY: {result.energy + nuclear_repulsion_energy}"
)
print(
f">>>>> \tSUBSPACE DIMENSION: {np.prod(result.sci_state.amplitudes.shape)}"
)
result = diagonalize_fermionic_hamiltonian(
hcore,
eri,
bit_array,
samples_per_batch=samples_per_batch,
norb=num_orbitals,
nelec=(nelec // 2, nelec // 2),
num_batches=num_batches,
energy_tol=energy_tol,
occupancies_tol=occupancies_tol,
max_iterations=max_iterations,
sci_solver=sci_solver,
symmetrize_spin=symmetrize_spin,
carryover_threshold=carryover_threshold,
callback=callback,
seed=12345,
)
result = (result.energy, result.rdm1, result.rdm2)
else:
# Serverless Logic
print(
f">>>>> SENDING QISKIT RUNTIME JOB {job_id} TO QISKIT SERVERLESS"
)
data = [
job_id,
hcore.tolist(),
eri.tolist(),
int(num_orbitals),
float(nuclear_repulsion_energy),
int(num_elec_a),
int(num_elec_b),
]
# Encode the execution dependencies with the JSONEncoder
data_e = JSONEncoder().encode(data)
# Send to Serverless
worker = client.load("diagonalization_engine")
serverless_job = worker.run(
data=data_e,
energy_tol=energy_tol, # SQD option
occupancies_tol=occupancies_tol, # SQD option
max_iterations=max_iterations, # SQD option
symmetrize_spin=symmetrize_spin, # Eigenstate solver option
carryover_threshold=carryover_threshold, # Eigenstate solver option
num_batches=num_batches, # Eigenstate solver option
samples_per_batch=samples_per_batch, # Eigenstate solver option
max_cycle=max_cycle, # Eigenstate solver option
mem=mem, # Memory per Worker (Gb)
)
# Wait for the job to execute
_ = feedback_serverless(serverless_job)
o_data = JSONDecoder().decode(serverless_job.result()["outputs"])
result = (o_data[1], np.array(o_data[2]), np.array(o_data[3]))
print(f">>>>>>>>>> Active Space Energy: {o_data[1]}")
print(f">>>>>>>>>> rdm1: {o_data[2]}")
print(f">>>>>>>>>> rdm2: {o_data[3]}")
return result# The Heartwood algorithm
import numpy as np
from qiskit_ibm_runtime import SamplerV2 as Sampler
from qiskit_addon_sqd.counts import generate_bit_array_uniform
mc = pyscf.mcscf.CASCI(mf_opt, ncas=norb, nelecas=ne_act).PCM(cm)
mc.with_solvent.method = mf_opt.with_solvent.method # Here we make sure
# that mc is also using the same solvent method defined earlier (IEF-PCM)
mc.with_solvent.eps = mf_opt.with_solvent.eps # Set the dielectric parameters
mc.mo_coeff = mo_avas.copy() # Update the molecular orbitals to include
# those computed in the presence of the solvent
h1e_cas, ecore = (
mc.get_h1eff()
) # <-- h1eff is the 1-electron hamiltonian integrals. h1e_cas is
# a common alias. ecore is the electronic energy from the core orbitals.
h2e_cas = ao2mo.restore(
1, mc.get_h2eff(), norb
) # <-- get the 2-electron hamiltonian integrals
mc.mo_coeff, t1, t2 = run_active_space_calculation(
h1e_cas,
h2e_cas,
norb,
ne_act,
mo_avas.copy(),
mf_opt.mol.nelectron // 2,
ecore,
)
# Sampler primitive options
sampler = Sampler(mode=backend)
# Explore error suppression techniques and see if they can improve result quality
sampler.options.dynamical_decoupling.enable = True
sampler.options.dynamical_decoupling.sequence_type = "XY4"
sampler.options.twirling.enable_measure = True
sampler.options.environment.job_tags = ["TUT_ISC"]
# sampler.options.twirling.enable_gates = False
# sampler.options.twirling.num_randomizations = 10
# sampler.options.twirling.shots_per_randomization = 1024
# initial approximation for rdm1
with_solvent_e, with_solvent_v = None, None # Don't touch
data = []
for iiter in range(n_iter):
print(f">>>>> IMPLICIT SOLVENT ITERATION {iiter+1}/{n_iter}")
if with_solvent_v is not None:
# Subsequent update loops enter here
mc.get_hcore = lambda *args: mc._scf.get_hcore() + with_solvent_v
else:
# First update loop starts here
# hcore is the CAS space (classically computed) 1-electron
# hamiltonian, which we default to at the start of the routine.
mc.get_hcore = (
lambda *args: mc._scf.get_hcore()
) # REF: https://pyscf.org/pyscf_api_docs/pyscf.mcscf.html#pyscf.scf.hf.CASBase.get_h1cas
# Alias mapping
# hcore : h1e_cas : h1e_eff
# nuclear_repulsion_energy : ecore
# eri : h2e_cas : h2e_eff
h1e_cas, ecore = (
mc.get_h1eff()
) # <-- h1eff is the 1-electron hamiltonian integrals. h1e_cas is a
# common alias. ecore is the electronic energy from the core orbitals.
h2e_cas = ao2mo.restore(
1, mc.get_h2eff(), norb
) # <-- get the 2-electron hamiltonian integrals
mc.mo_coeff, t1, t2 = run_active_space_calculation(
h1e_cas,
h2e_cas,
norb,
ne_act,
mo_avas.copy(),
mf_opt.mol.nelectron // 2,
ecore,
)
if use_hardware:
if (
iiter % resample == 0
): # <-- Toggle how often you refresh your bitstrings here. The
# developer suggests that you do it every time, but benevolently
# provides the freedom to disagree with him via the resample
# control variable.
# The "Quantum-Centric" part
print(">>>>> GENERATING BITSTRINGS USING QUANTUM HARDWARE")
# LUCJ Ansatz construction
circuit = get_lucj(norb, num_elec_a, num_elec_b, t1, t2, n_reps=1)
print(f">>>>> TRANSPILING LUCJ TO {backend.name}")
isa_circuit = pass_manager.run(circuit)
print(f">>>>> SUBMITTING ISA_CIRCUIT TO {backend.name}")
job = sampler.run(
[isa_circuit], shots=shots
) # <----- Error Suppression/Mitigation configured performed upstream
job_id = str(job.job_id())
timer = 0
while job.status() != "DONE":
timer += 10
print(
f">>>>> [{timer}s] RUNTIME JOB {job_id}: {job.status()}"
)
time.sleep(10)
primitive_result = job.result()
print(f">>>>> RETRIEVED {job_id} FROM {backend.name}")
pub_result = primitive_result[0]
bit_array = pub_result.data.meas
else:
print(">>>>> GENERATING BITSTRINGS CLASSICALLY")
rng = np.random.default_rng(24)
bit_array = generate_bit_array_uniform(
100_000, 2 * norb, rand_seed=rng
) # <-- Sample bitstrings from a uniform distribution. This is
# useful for debug, but runs out of steam on large systems
job_id = float(
"nan"
) # <-- we will check that valid job_id's were passed during grading
local = True
# The "Classical Post-processing" part
result = classically_diagonalize(
bit_array=bit_array,
nuclear_repulsion_energy=ecore, # Electronic energy from the core orbitals
hcore=h1e_cas, # 1-electron hamiltonian integrals
eri=h2e_cas, # 2-electron hamiltonian integrals
num_orbitals=norb, # Number of spatial orbitals
nelec=ne_act, # Number of electrons
num_elec_a=ne_act // 2, # Alpha orbitals
num_elec_b=ne_act // 2, # Beta orbitals
job_id=job_id, # QPU bitstring Job ID
client=client, # Diagonalization engine worker
energy_tol=energy_tol, # SQD option
occupancies_tol=occupancies_tol, # SQD option
max_iterations=max_iterations, # SQD option
num_batches=num_batches, # Eigenstate solver option
samples_per_batch=samples_per_batch, # Eigenstate solver option
symmetrize_spin=symmetrize_spin, # Eigenstate solver option
carryover_threshold=carryover_threshold, # Eigenstate solver option
max_cycle=max_cycle, # Eigenstate solver option
local=local, # Remote vs Local Diagonalization
mem=mem, # Memory per Worker (Gb)
)
# e : SQD-based estimate of the energy
# rdm1: Spin-summed 1-particle reduced density matrix
e, rdm1 = result[0], result[1]
rho_approximation = update_rdm(
mc, rdm1
).copy() # <--- Reconstruct the one-body density matrix in the
# atomic orbital basis to update the external potential due to
# the solvent
if with_solvent_e is not None:
# Subsequent update loops enter here
edup = np.einsum(
"ij,ji->", with_solvent_v, rho_approximation
) # <-- edup: Incrementing the energy calculation with
# subsequent iterations
e += ecore + with_solvent_e - edup
else:
# First update loop enters here
e += (
ecore # Pulled from the CAS space object (molecule's core energy)
)
# Outputs:
# with_solvent_e : scalar energy correction due to solvent polarization
# with_solvent_v : Fock-like matrix to be added to the core Hamiltonian in SCF
with_solvent_e, with_solvent_v = mc.with_solvent._get_vind(
rho_approximation
)
data.append((iiter, float(e), job_id))
print(f">>>>> END IITER {iiter}")
print(f">>>>> TOTAL ENERGY: {e}\n")import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter
def plot_data(data, baseline=0, name=None, save=False):
x_vals, y_vals, job_ids = zip(*data)
fig, ax = plt.subplots(figsize=(10, 6))
# Plot line + markers
ax.plot(
x_vals,
y_vals,
color="navy",
linewidth=2,
marker="o",
markersize=5,
label="Energy trajectory",
)
ax.axhline(
baseline,
color="red",
linestyle="--",
linewidth=1.5,
label="Reference energy",
)
# Force plain formatting
ax.yaxis.set_major_formatter(ScalarFormatter(useMathText=True))
ax.ticklabel_format(style="plain", axis="y")
# Annotate each point with its exact value
for x, y, job_id in zip(x_vals, y_vals, job_ids):
ax.annotate(
f"{y:.8f}, ID: {job_id}",
(x, y),
textcoords="offset points",
xytext=(0, 8), # vertical offset
ha="center",
fontsize=8,
rotation=25,
color="navy",
)
# Annotate the Classical Reference line
for x, y in zip([0.5], [baseline]):
ax.annotate(
f"{y:.5f}",
(x, y),
textcoords="offset points",
xytext=(0, 8), # vertical offset
ha="center",
fontsize=8,
rotation=25,
color="red",
)
# Titles, labels, etc
ax.set_title(
f"SQD/IEF-PCM(cc-pVDZ) - {name}\nEnergy Convergence",
fontsize=14,
fontweight="bold",
pad=15,
)
ax.set_xlabel("Update Iterations", fontsize=12)
ax.set_ylabel("Total Energy (Hartrees)", fontsize=12)
ax.grid(True, linestyle="--", linewidth=0.6, alpha=0.7)
ax.legend(frameon=True, loc="best")
plt.tight_layout()
if save:
plt.savefig(f"./results/{name}_energy_convergence.png")
return fig, ax# Plot your data
fig, ax = plot_data(data, baseline=CASCI_E, name=molecule_name, save=True)
plt.show()Next steps
If you found this work interesting, you might be interested in the following resources:
- The Quantum Resource Management Interface (QRMI) — integrate quantum and classical resources into HPC workload managers like Slurm, extending the distributed computing pattern demonstrated in this tutorial
- Guide to Qiskit Serverless
- Qiskit Serverless GitHub
- Sample-based quantum diagonalization of a chemistry Hamiltonian tutorial
- Sample-based quantum diagonalization (SQD) overview
- Deploy and run a template for electronic structure simulation with an implicit solvent model (SQD IEF-PCM Qiskit Function template, jointly developed by Cleveland Clinic and IBM)