# This code is a Qiskit project.
# (C) Copyright IBM 2024.
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Code for generating quasiprobability weights."""
from __future__ import annotations
from collections.abc import Sequence
from collections import Counter
from enum import Enum
import itertools
import logging
import math
import numpy as np
import numpy.typing as npt
from .qpd_basis import QPDBasis
from ..utils.iteration import strict_zip
logger = logging.getLogger(__name__)
# math.sin(math.pi) is just above 1e-16, so at 1e-14, we are well above that.
# Numbers like this can often come up in the QPD coefficients.
_NONZERO_ATOL = 1e-14
[docs]
class WeightType(Enum):
"""Type of weight associated with a QPD sample."""
#: A weight given in proportion to its exact weight
EXACT = 1
#: A weight that was determined through some sampling procedure
SAMPLED = 2
def _min_filter_nonzero(vals: npt.NDArray[np.float64], *, atol=_NONZERO_ATOL):
return np.min(vals[np.logical_not(np.isclose(vals, 0, atol=atol))])
def __update_running_product_after_increment(
running_product: list[float],
state: Sequence[int],
coeff_probabilities: Sequence[npt.NDArray[np.float64]],
):
"""Update the ``running_product`` list after the ``state`` has been incremented.
This snippet is used twice in
:func:`_generate_exact_weights_and_conditional_probabilities_assume_sorted`;
hence, we write it only once here.
"""
try:
prev = running_product[-2]
except IndexError:
prev = 1.0
running_product[-1] = prev * coeff_probabilities[len(state) - 1][state[-1]]
def _generate_exact_weights_and_conditional_probabilities_assume_sorted(
coeff_probabilities: Sequence[npt.NDArray[np.float64]], threshold: float
):
r"""Determine all exact weights above ``threshold`` and the conditional probabilities necessary to sample efficiently from all other weights.
Each yielded element will be a 2-tuple, the first element of which will be
a ``state``, represented by a tuple of ``int``\ s.
If ``state`` contains the same number of elements as
``coeff_probabilities``, then the second element will be a ``float``,
greater than or equal to ``threshold``, that contains the exact probability
of this ``state``.
If, on the other hand, ``state`` contains fewer elements than
``coeff_probabilities``, then the second element is a 1D ``np.ndarray`` of
the conditional probabilities that can be used to sample the remaining
weights that have not been evaluated exactly (i.e., are below
``threshold``). These will all be normalized *except* the
top-level one, which is given by ``state == ()``.
If there are no exact weights below a given level, then the conditional
probabilities at that level are equivalent to the corresponding element of
`coeff_probabilities``, and this generator does not yield them. Skipping
redundant conditional probabilities, like this, allows for efficient
sampling without traversing the entire tree, which is desirable since the
tree grows exponentially with the number of cut gates. By contrast, the
performance with the provided strategy is no worse than linear in
``threshold``, in both time and memory.
This function assumes each element of ``coeff_probabilities`` contains
non-negative numbers, ordered largest to smallest. For a generalization
that allows ``coeff_probabilities`` to be provided in any order, see
:func:`_generate_exact_weights_and_conditional_probabilities`.
"""
assert len(coeff_probabilities) > 0
next_pop = False # Stores whether the next move is a pop or not
state = [0]
running_product = [coeff_probabilities[0][0]]
running_conditional_probabilities: list[npt.NDArray[np.float64]] = []
while True:
if next_pop:
# Pop
state.pop()
if len(state) + 1 == len(running_conditional_probabilities):
# There were some exact weights found below us, so we need to
# yield the conditional probabilities, unless all probabilities
# at this level are zero *and* it's not the top level.
current_condprobs = running_conditional_probabilities.pop()
current_condprobs[
np.isclose(current_condprobs, 0, atol=_NONZERO_ATOL)
] = 0.0
if not state:
# Don't normalize the top-level conditional probabilities.
yield (), current_condprobs
else:
norm = np.sum(current_condprobs)
if norm != 0:
# Some, but not all, weights below us are exact. If,
# instead, _all_ weights had been exact, we could have
# skipped this yield, as there is zero probability of
# reaching this partial state when sampling.
yield tuple(state), current_condprobs / norm
# Update the factor one level up from the popped one.
running_conditional_probabilities[-1][state[-1]] *= norm
if not state:
# We're all done
return
running_product.pop()
# Increment the state counter.
state[-1] += 1
if state[-1] != len(coeff_probabilities[len(state) - 1]):
# Increment successful (no overflow). We don't need to pop again.
next_pop = False
__update_running_product_after_increment(
running_product, state, coeff_probabilities
)
else:
if running_product[-1] < threshold:
next_pop = True
elif len(state) < len(coeff_probabilities):
# Append 0 to work toward a "full" `state`
running_product.append(
running_product[-1] * coeff_probabilities[len(state)][0]
)
state.append(0)
else:
# `state` is full. Yield first.
yield tuple(state), running_product[-1]
# Since we found something exact, we need to update running_conditional_probabilities.
while len(running_conditional_probabilities) < len(coeff_probabilities):
running_conditional_probabilities.append(
np.array(
coeff_probabilities[len(running_conditional_probabilities)],
dtype=float,
)
)
# It's exact, so we want no probability of sampling it going forward.
running_conditional_probabilities[-1][state[-1]] = 0
# Increment the state counter.
state[-1] += 1
if state[-1] == len(coeff_probabilities[-1]):
# The state counter has overflowed, so our next move should be a
# pop.
next_pop = True
else:
__update_running_product_after_increment(
running_product, state, coeff_probabilities
)
def _invert_permutation(p):
# https://stackoverflow.com/questions/11649577/how-to-invert-a-permutation-array-in-numpy
s = np.empty(p.size, p.dtype)
s[p] = np.arange(p.size)
return s
def _generate_exact_weights_and_conditional_probabilities(
coeff_probabilities: Sequence[npt.NDArray[np.float64]], threshold: float
):
"""Generate exact weights and conditional probabilities.
This is identical in behavior to
:func:`_generate_exact_weights_and_conditional_probabilities_assume_sorted`,
except it makes no assumption on the order of the coefficients.
"""
permutations = [np.argsort(cp)[::-1] for cp in coeff_probabilities]
sorted_coeff_probabilities = [
cp[permutation] for cp, permutation in zip(coeff_probabilities, permutations)
]
ipermutations = [_invert_permutation(p) for p in permutations]
for (
coeff_indices,
probability,
) in _generate_exact_weights_and_conditional_probabilities_assume_sorted(
sorted_coeff_probabilities, threshold
):
orig_coeff_indices = tuple(
perm[idx] for perm, idx in zip(permutations, coeff_indices)
)
if len(coeff_indices) != len(sorted_coeff_probabilities):
# In this case, `probability` is actually a vector of conditional
# probabilities, so apply the inverse permutation.
probability = probability[ipermutations[len(coeff_indices)]]
yield orig_coeff_indices, probability
[docs]
def generate_qpd_weights(
qpd_bases: Sequence[QPDBasis], num_samples: float = 1000
) -> dict[tuple[int, ...], tuple[float, WeightType]]:
"""Generate weights from the joint quasiprobability distribution.
Each weight whose absolute value is above a threshold of ``1 /
num_samples`` will be evaluated exactly. The remaining weights -- those in
the tail of the distribution -- will then be sampled from, resulting in at
most ``num_samples`` unique weights.
Args:
qpd_bases: The :class:`QPDBasis` objects from which to generate weights
num_samples: Controls the number of weights to generate
Returns:
A mapping from a given decomposition to its weight.
Keys are tuples of indices -- one index per input :class:`QPDBasis`. The indices
correspond to a specific decomposition mapping in the basis.
Values are tuples. The first element is a number corresponding to the
weight of the contribution. The second element is the :class:`WeightType`,
either ``EXACT`` or ``SAMPLED``.
"""
independent_probabilities = [np.asarray(basis.probabilities) for basis in qpd_bases]
# In Python 3.7 and higher, dicts are guaranteed to remember their
# insertion order. For user convenience, we sort by exact weights first,
# then sampled weights. Within each, values are sorted largest to
# smallest.
lst = sorted(
_generate_qpd_weights(independent_probabilities, num_samples).items(),
key=lambda x: ((v := x[1])[1].value, -v[0]),
)
return dict(lst)
def _generate_qpd_weights(
independent_probabilities: Sequence[npt.NDArray[np.float64]],
num_samples: float,
*,
_samples_multiplier: int = 1,
) -> dict[tuple[int, ...], tuple[float, WeightType]]:
if not num_samples >= 1:
raise ValueError("num_samples must be at least 1.")
retval: dict[tuple[int, ...], tuple[float, WeightType]] = {}
threshold = 1 / num_samples
# Determine if the smallest *nonzero* probability is above the threshold implied by
# num_samples. If so, then we can evaluate all weights exactly.
smallest_probability = np.prod(
[_min_filter_nonzero(probs) for probs in independent_probabilities]
)
if smallest_probability >= threshold:
# All weights exactly
logger.info("All exact weights")
multiplier = num_samples if math.isfinite(num_samples) else 1.0
for map_ids in itertools.product(
*[range(len(probs)) for probs in independent_probabilities]
):
probability = np.prod(
[
probs[i]
for i, probs in strict_zip(map_ids, independent_probabilities)
]
)
if probability < _NONZERO_ATOL:
continue
retval[map_ids] = (multiplier * probability, WeightType.EXACT)
return retval
conditional_probabilities: dict[tuple[int, ...], npt.NDArray[np.float64]] = {}
weight_to_sample = 1.0
largest_probability = np.prod(
[np.max(probs) for probs in independent_probabilities]
)
if not largest_probability >= threshold:
logger.info("No exact weights")
else:
logger.info("Some exact weights")
for (
map_ids,
probability,
) in _generate_exact_weights_and_conditional_probabilities(
independent_probabilities, threshold
):
# As described in the docstring to
# _generate_exact_weights_and_conditional_probabilities_assume_sorted,
# there are two possible pieces of information that might be
# yielded by the generator. The following branch distinguishes
# between them.
if len(map_ids) == len(independent_probabilities):
# The generator produced a state together with its exact probability.
weight = probability * num_samples
retval[map_ids] = (weight, WeightType.EXACT)
else:
# The generator produced a partial state along with a vector of
# conditional probabilities.
conditional_probabilities[map_ids] = probability
if map_ids == ():
weight_to_sample = np.sum(probability)
conditional_probabilities[map_ids] /= weight_to_sample
# Loop through each gate and sample from the remainder of the distribution.
# Start by rescaling.
weight_to_sample *= num_samples
# The following variable, `samples_needed`, must be integer and at least 1.
# `_samples_multiplier` will typically be 1, but we set it higher in
# testing to collect additional statistics, faster.
samples_needed = math.ceil(weight_to_sample) * _samples_multiplier
# At the time of writing, the below assert should never fail. But if
# future code changes result in inputs where it _may_ fail, then the only
# thing that should be needed if this is reached is to return `retval` in
# this case, since presumably it must contain all weights as exact weights.
assert samples_needed >= 1
single_sample_weight = weight_to_sample / samples_needed
# Figure out if we've reached the special case where everything except
# *one* weight has been calculated exactly, so there's only one thing left
# to sample. If that's the case, then we can calculate that one remaining
# weight exactly as well and skip sampling.
if conditional_probabilities:
running_state: tuple[int, ...] = ()
while len(running_state) < len(independent_probabilities):
# If it's missing from `conditional_probabilities`, that just means
# to use the corresponding entry in `independent_probabilities`.
try:
probs = conditional_probabilities[running_state]
except KeyError:
probs = independent_probabilities[len(running_state)]
x = np.flatnonzero(probs)
assert len(x) != 0
if len(x) > 1:
break
running_state += (x[0],)
else:
assert running_state not in retval
retval[running_state] = (weight_to_sample, WeightType.EXACT)
return retval
# Form the joint samples, collecting them into a dict with counts for each.
random_samples: dict[tuple[int, ...], int] = {}
_populate_samples(
random_samples,
samples_needed,
independent_probabilities,
conditional_probabilities,
)
# Insert the samples into the dict we are about to return.
for outcome, count in random_samples.items():
assert outcome not in retval
retval[outcome] = (count * single_sample_weight, WeightType.SAMPLED)
return retval
def _populate_samples(
random_samples: dict[tuple[int, ...], int],
num_desired: int,
independent_probabilities: Sequence,
conditional_probabilities: dict[tuple[int, ...], npt.NDArray[np.float64]],
running_state: tuple[int, ...] = (),
) -> None:
"""Generate random samples from the conditional probabilitity distributions.
Items get populated into the ``random_samples`` dict, rather than returned.
This function is designed to call itself recursively. The number of
elements in ``running_state`` will match the recursion depth.
"""
if running_state not in conditional_probabilities:
# Everything below us is sampled, so we can sample directly from the
# remaining independent probability distributions.
samples_by_decomp = []
for probs in independent_probabilities[len(running_state) :]:
samples_by_decomp.append(
np.random.choice(range(len(probs)), num_desired, p=probs)
)
for outcome, count in Counter(zip(*samples_by_decomp)).items():
assert (running_state + outcome) not in random_samples
random_samples[running_state + outcome] = count
return
# There are some exact weight(s) below us, so we must consider the
# conditional probabilities at the current level.
probs = conditional_probabilities[running_state]
current_outcomes = np.random.choice(range(len(probs)), num_desired, p=probs)
for current_outcome, count in Counter(current_outcomes).items():
outcome = running_state + (current_outcome,)
if len(outcome) == len(independent_probabilities):
# It's a full one
assert outcome not in random_samples
random_samples[outcome] = count
else:
# Recurse
_populate_samples(
random_samples,
count,
independent_probabilities,
conditional_probabilities,
outcome,
)