# This code is a Qiskit project.
# (C) Copyright IBM 2023.
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Functions for reconstructing the results of circuit cutting experiments."""
from __future__ import annotations
from collections.abc import Sequence, Hashable, Mapping
import numpy as np
from qiskit.quantum_info import PauliList
from qiskit.primitives import (
SamplerResult, # for SamplerV1
PrimitiveResult, # for SamplerV2
)
from .utils.observable_grouping import CommutingObservableGroup, ObservableCollection
from .utils.bitwise import bit_count
from .cutting_decomposition import decompose_observables
from .cutting_experiments import _get_pauli_indices
from .qpd import WeightType
[docs]
def reconstruct_expectation_values(
results: (
SamplerResult
| PrimitiveResult
| dict[Hashable, SamplerResult | PrimitiveResult]
),
coefficients: Sequence[tuple[float, WeightType]],
observables: PauliList | dict[Hashable, PauliList],
) -> list[float]:
r"""Reconstruct an expectation value from the results of the sub-experiments.
Args:
results: The results from running the cutting subexperiments. If the cut circuit
was not partitioned between qubits and run separately, this argument should be
a :class:`~qiskit.primitives.SamplerResult` instance or a dictionary mapping
a single partition to the results. If the circuit was partitioned and its
pieces were run separately, this argument should be a dictionary mapping partition labels
to the results from each partition's subexperiments.
The subexperiment results are expected to be ordered in the same way the subexperiments
are ordered in the output of :func:`.generate_cutting_experiments` -- one result for every
sample and observable, as shown below. The Qiskit Sampler primitive will return the results
in the same order the experiments are submitted, so users who do not use :func:`.generate_cutting_experiments`
to generate their experiments should take care to order their subexperiments as follows before submitting them
to the sampler primitive:
:math:`[sample_{0}observable_{0}, \ldots, sample_{0}observable_{N-1}, sample_{1}observable_{0}, \ldots, sample_{M-1}observable_{N-1}]`
coefficients: A sequence containing the coefficient associated with each unique subexperiment. Each element is a tuple
containing the coefficient (a ``float``) together with its :class:`.WeightType`, which denotes
how the value was generated. The contribution from each subexperiment will be multiplied by
its corresponding coefficient, and the resulting terms will be summed to obtain the reconstructed expectation value.
observables: The observable(s) for which the expectation values will be calculated.
This should be a :class:`~qiskit.quantum_info.PauliList` if ``results`` is a
:class:`~qiskit.primitives.SamplerResult` instance. Otherwise, it should be a
dictionary mapping partition labels to the observables associated with that partition.
Returns:
A ``list`` of ``float``\ s, such that each float is an expectation
value corresponding to the input observable in the same position
Raises:
ValueError: ``observables`` and ``results`` are of incompatible types.
ValueError: An input observable has a phase not equal to 1.
"""
# If circuit was not separated, transform input data structures to
# dictionary format. Perform some input validation in either case.
if isinstance(observables, PauliList):
if not isinstance(results, (SamplerResult, PrimitiveResult)):
raise ValueError(
"If observables is a PauliList, results must be a SamplerResult or PrimitiveResult instance."
)
if any(obs.phase != 0 for obs in observables):
raise ValueError("An input observable has a phase not equal to 1.")
subobservables_by_subsystem: Mapping[Hashable, PauliList] = (
decompose_observables(observables, "A" * len(observables[0]))
)
results_dict: Mapping[Hashable, SamplerResult | PrimitiveResult] = {
"A": results
}
expvals = np.zeros(len(observables))
elif isinstance(observables, Mapping):
if not isinstance(results, Mapping):
raise ValueError(
"If observables is a dictionary, results must also be a dictionary."
)
if observables.keys() != results.keys():
raise ValueError(
"The subsystem labels of the observables and results do not match."
)
results_dict = results
for label, subobservable in observables.items():
if any(obs.phase != 0 for obs in subobservable):
raise ValueError("An input observable has a phase not equal to 1.")
subobservables_by_subsystem = observables
expvals = np.zeros(len(list(observables.values())[0]))
else:
raise ValueError("observables must be either a PauliList or dict.")
subsystem_observables = {
label: ObservableCollection(subobservables)
for label, subobservables in subobservables_by_subsystem.items()
}
# Validate that the number of subexperiments executed is consistent with
# the number of coefficients and observable groups.
for label, so in subsystem_observables.items():
current_result = results_dict[label]
if isinstance(current_result, SamplerResult):
# SamplerV1 provides a SamplerResult
current_result = current_result.quasi_dists
if len(current_result) != len(coefficients) * len(so.groups):
raise ValueError(
f"The number of subexperiments performed in subsystem '{label}' "
f"({len(current_result)}) should equal the number of coefficients "
f"({len(coefficients)}) times the number of mutually commuting "
f"subobservable groups ({len(so.groups)}), but it does not."
)
# Reconstruct the expectation values
for i, coeff in enumerate(coefficients):
current_expvals = np.ones((len(expvals),))
for label, so in subsystem_observables.items():
subsystem_expvals = [
np.zeros(len(cog.commuting_observables)) for cog in so.groups
]
current_result = results_dict[label]
for k, cog in enumerate(so.groups):
idx = i * len(so.groups) + k
if isinstance(current_result, SamplerResult):
# SamplerV1 provides a SamplerResult
quasi_probs = current_result.quasi_dists[idx]
for outcome, quasi_prob in quasi_probs.items():
subsystem_expvals[k] += quasi_prob * _process_outcome(
cog, outcome
)
else:
# SamplerV2 provides a PrimitiveResult
data_pub = current_result[idx].data
obs_array = data_pub.observable_measurements.array
qpd_array = data_pub.qpd_measurements.array
shots = qpd_array.shape[0]
for j in range(shots):
obs_outcomes = int.from_bytes(obs_array[j], "big")
qpd_outcomes = int.from_bytes(qpd_array[j], "big")
subsystem_expvals[k] += (1 / shots) * _process_outcome_v2(
cog, obs_outcomes, qpd_outcomes
)
for k, subobservable in enumerate(subobservables_by_subsystem[label]):
current_expvals[k] *= np.mean(
[subsystem_expvals[m][n] for m, n in so.lookup[subobservable]]
)
expvals += coeff[0] * current_expvals
return list(expvals)
def _process_outcome(
cog: CommutingObservableGroup, outcome: int | str, /
) -> np.typing.NDArray[np.float64]:
"""Process a single outcome of a QPD experiment with observables.
Args:
cog: The observable set being measured by the current experiment
outcome: The outcome of the classical bits
Returns:
A 1D array of the observable measurements. The elements of
this vector correspond to the elements of ``cog.commuting_observables``,
and each result will be either +1 or -1.
"""
num_meas_bits = len(_get_pauli_indices(cog))
outcome = _outcome_to_int(outcome)
obs_outcomes = outcome & ((1 << num_meas_bits) - 1)
qpd_outcomes = outcome >> num_meas_bits
return _process_outcome_v2(cog, obs_outcomes, qpd_outcomes)
def _process_outcome_v2(
cog: CommutingObservableGroup, obs_outcomes: int, qpd_outcomes: int, /
) -> np.typing.NDArray[np.float64]:
"""Process a single outcome of a QPD experiment with observables.
Args:
cog: The observable set being measured by the current experiment
obs_outcomes: An integer containing the outcome bits of the ``observable_measurements`` register
qpd_outcomes: An integer containing the outcome bits of the ``qpd_measurements`` register
Returns:
A 1D array of the observable measurements. The elements of
this vector correspond to the elements of ``cog.commuting_observables``,
and each result will be either +1 or -1.
"""
# qpd_factor will be -1 or +1, depending on the overall parity of qpd
# measurements.
qpd_factor = 1 - 2 * (bit_count(qpd_outcomes) & 1)
rv = np.zeros(len(cog.pauli_bitmasks))
for i, mask in enumerate(cog.pauli_bitmasks):
# obs will be -1 or +1, depending on the measurement
# of the current operator.
obs = 1 - 2 * (bit_count(obs_outcomes & mask) & 1)
rv[i] = qpd_factor * obs
return rv
def _outcome_to_int(outcome: int | str) -> int:
if isinstance(outcome, int):
return outcome
outcome = outcome.replace(" ", "")
if len(outcome) < 2 or outcome[1] in ("0", "1"):
outcome = outcome.replace(" ", "")
return int(f"0b{outcome}", 0)
return int(outcome, 0)