Source code for mthree.mitigation

# This code is part of Mthree.
#
# (C) Copyright IBM 2021.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
# pylint: disable=no-name-in-module, invalid-name
"""Calibration data"""

import warnings
import threading
import datetime
from math import ceil
from time import perf_counter
import logging

import psutil
import numpy as np
import orjson
import runningman as rm
from runningman.utils import is_ibm_backend

from mthree.circuits import (
    _tensor_meas_states,
    _marg_meas_states,
    balanced_cal_strings,
    balanced_cal_circuits,
)
from mthree.direct import direct_solver as direct_solve
from mthree.direct import reduced_cal_matrix as cal_matrix
from mthree.iterative import iterative_solver

from mthree.exceptions import M3Error
from mthree.classes import QuasiCollection
from ._helpers import system_info

logger = logging.getLogger(__name__)


[docs] class M3Mitigation: """Main M3 calibration class.""" def __init__(self, system=None, iter_threshold=4096): """Main M3 calibration class. Parameters: system (Backend): Target backend. iter_threshold (int): Sets the bitstring count at which iterative mode is turned on (assuming reasonable error rates). Attributes: system (Backend): The target system or execution manager. system_info (dict): Information needed about the system cal_method (str): Calibration method used cal_timestamp (str): Time at which cals were taken single_qubit_cals (list): 1Q calibration matrices """ if is_ibm_backend(system): system = rm.RunningManBackend(system) self.system = system self.system_info = system_info(system) if system else {} self.single_qubit_cals = None self.num_qubits = self.system_info["num_qubits"] if system else None self.iter_threshold = iter_threshold self.cal_shots = None self.cal_method = "balanced" self.cal_timestamp = None # The time at which the cals result was generated self.rep_delay = None # attributes for handling threaded job self._thread = None self._job_error = None # Holds the cals file self.cals_file = None # faulty qubits self.faulty_qubits = [] # The number of shots used for balanced denominator self._balanced_shots = None def __getattribute__(self, attr): """This allows for checking the status of the threaded cals call For certain attr this will join the thread and/or raise an error. """ __dict__ = super().__getattribute__("__dict__") if attr in __dict__: if attr in ["single_qubit_cals"]: self._thread_check() return super().__getattribute__(attr) def _form_cals(self, qubits): """Form the 1D cals array from tensored cals data Parameters: qubits (array_like): The qubits to calibrate over. Returns: ndarray: 1D Array of float cals data. """ qubits = np.asarray(qubits, dtype=int) cals = np.zeros(4 * qubits.shape[0], dtype=np.float32) # Reverse index qubits for easier indexing later for kk, qubit in enumerate(qubits[::-1]): cals[4 * kk : 4 * kk + 4] = self.single_qubit_cals[qubit].ravel() return cals
[docs] def tensored_cals_from_system( self, qubits=None, shots=None, method="balanced", rep_delay=None, cals_file=None ): """Grab calibration data from system. Parameters: qubits (array_like): Qubits over which to correct calibration data. Default is all. shots (int): Number of shots per circuit. Default is min(1e4, max_shots). method (str): Type of calibration, 'balanced' (default), 'independent', or 'marginal'. rep_delay (float): Delay between circuits on IBM Quantum backends. cals_file (str): Output path to write JSON calibration data to. """ warnings.warn("This method is deprecated, use 'cals_from_system' instead.") self.cals_from_system( qubits=qubits, shots=shots, method=method, rep_delay=rep_delay, cals_file=cals_file, )
[docs] def cals_from_system( self, qubits=None, shots=None, method=None, initial_reset=False, rep_delay=None, cals_file=None, async_cal=True, runtime_mode=None, ): """Grab calibration data from system. Parameters: qubits (array_like): Qubits over which to correct calibration data. Default is all. shots (int): Number of shots per circuit. min(1e4, max_shots). method (str): Type of calibration, 'balanced' (default for hardware), 'independent' (default for simulators), or 'marginal'. initial_reset (bool): Use resets at beginning of calibration circuits, default=False. rep_delay (float): Delay between circuits on IBM Quantum backends. cals_file (str): Output path to write JSON calibration data to. async_cal (bool): Do calibration async in a separate thread, default is True. runtime_mode (Batch or Session): Mode to run jobs in if using IBM system, default=None Returns: list: List of jobs submitted. Raises: M3Error: Called while a calibration currently in progress. """ if self._thread: raise M3Error("Calibration currently in progress.") if isinstance(self.system, rm.RunningManBackend): if runtime_mode: self.system.set_mode(runtime_mode, overwrite=True) elif not self.system.get_mode(): self.system.set_mode("batch") if qubits is None: qubits = range(self.num_qubits) # Remove faulty qubits if any if any(self.system_info["inoperable_qubits"]): qubits = list( filter( lambda item: item not in self.system_info["inoperable_qubits"], list(range(self.num_qubits)), ) ) warnings.warn( "Backend reporting inoperable qubits." + " Skipping calibrations for: {}".format( self.system_info["inoperable_qubits"] ) ) if method is None: method = "balanced" if self.system_info["simulator"]: method = "independent" self.cal_method = method self.rep_delay = rep_delay self.cals_file = cals_file self.cal_timestamp = None jobs = self._grab_additional_cals( qubits, shots=shots, method=method, rep_delay=rep_delay, initial_reset=initial_reset, async_cal=async_cal, ) return jobs
[docs] def cals_from_file(self, cals_file): """Generated the calibration data from a previous runs output Parameters: cals_file (str): A string path to the saved counts file from an earlier run. Raises: M3Error: Calibration in progress. """ if self._thread: raise M3Error("Calibration currently in progress.") with open(cals_file, "r", encoding="utf-8") as fd: loaded_data = orjson.loads(fd.read()) if isinstance(loaded_data, dict): self.single_qubit_cals = [ np.asarray(cal, dtype=np.float32) if cal else None for cal in loaded_data["cals"] ] self.cal_timestamp = loaded_data["timestamp"] self.cal_shots = loaded_data.get("shots", None) else: warnings.warn("Loading from old M3 file format. Save again to update.") self.cal_timestamp = None self.cal_shots = None self.single_qubit_cals = [ np.asarray(cal, dtype=np.float32) if cal else None for cal in loaded_data ] self.faulty_qubits = _faulty_qubit_checker(self.single_qubit_cals)
[docs] def cals_to_file(self, cals_file=None): """Save calibration data to JSON file. Parameters: cals_file (str): File in which to store calibrations. Raises: M3Error: Calibration filename missing. M3Error: Mitigator is not calibrated. """ if not cals_file: raise M3Error("cals_file must be explicitly set.") if not self.single_qubit_cals: raise M3Error("Mitigator is not calibrated.") save_dict = { "timestamp": self.cal_timestamp, "backend": self.system_info.get("name", None), "shots": self.cal_shots, "cals": self.single_qubit_cals, } with open(cals_file, "wb") as fd: fd.write(orjson.dumps(save_dict, option=orjson.OPT_SERIALIZE_NUMPY))
[docs] def tensored_cals_from_file(self, cals_file): """Generated the tensored calibration data from a previous runs output Parameters: cals_file (str): A string path to the saved counts file from an earlier run. """ warnings.warn("This method is deprecated, use 'cals_from_file' instead.") self.cals_from_file(cals_file)
[docs] def cals_from_matrices(self, matrices): """Init calibration data from list of NumPy arrays. Missing entires are set to None elements. Parameters: matrices (list_like): List of cals as NumPy arrays Raises: M3Error: If system set error if list length != num_qubits on system """ matrices = list(matrices) if self.num_qubits: if len(matrices) != self.num_qubits: raise M3Error( "Input list length not equal to" " number of qubits {} != {}".format(len(matrices), self.num_qubits) ) self.single_qubit_cals = matrices self.faulty_qubits = _faulty_qubit_checker(self.single_qubit_cals)
[docs] def cals_to_matrices(self): """Return single qubit cals as list of NumPy arrays Returns: list: List of cals as NumPy arrays """ return self.single_qubit_cals.copy()
def _grab_additional_cals( self, qubits, shots=None, method="balanced", rep_delay=None, initial_reset=False, async_cal=False, ): """Grab missing calibration data from backend. Parameters: qubits (array_like): List of measured qubits. shots (int): Number of shots to take, min(1e4, max_shots). method (str): Type of calibration, 'balanced' (default), 'independent', or 'marginal'. rep_delay (float): Delay between circuits on IBM Quantum backends. initial_reset (bool): Use resets at beginning of calibration circuits, default=False. async_cal (bool): Do calibration async in a separate thread, default is False. Raises: M3Error: Backend not set. M3Error: Faulty qubits found. """ if self.system is None: raise M3Error("System is not set. Use 'cals_from_file'.") if self.single_qubit_cals is None: self.single_qubit_cals = [None] * self.num_qubits if self.cal_shots is None: if shots is None: shots = min(self.system_info["max_shots"], 10000) self.cal_shots = shots if self.rep_delay is None: self.rep_delay = rep_delay logger.info( "Grabbing calibration data for qubits=%s, method=%s, async_cal=%s", qubits, method, async_cal, ) if method not in ["independent", "balanced", "marginal"]: raise M3Error(f"Invalid calibration method {method}.") if isinstance(qubits, dict): # Assuming passed a mapping qubits = list(set(qubits.values())) elif isinstance(qubits, list): # Check if passed a list of mappings if isinstance(qubits[0], dict): # Assuming list of mappings, need to get unique elements _qubits = [] for item in qubits: _qubits.extend(list(set(item.values()))) qubits = list(set(_qubits)) # Do check for inoperable qubits here inoperable_overlap = list( set(qubits) & set(self.system_info["inoperable_qubits"]) ) if any(inoperable_overlap): raise M3Error( "Attempting to calibrate inoperable qubits: {}".format( inoperable_overlap ) ) num_cal_qubits = len(qubits) cal_strings = [] # shots is needed here because balanced cals will use a value # different from cal_shots shots = self.cal_shots logger.info("Generating calibration circuits.") if method == "marginal": trans_qcs = _marg_meas_states( qubits, self.num_qubits, initial_reset=initial_reset ) elif method == "balanced": cal_strings = balanced_cal_strings(num_cal_qubits) trans_qcs = balanced_cal_circuits( cal_strings, qubits, self.num_qubits, initial_reset=initial_reset ) shots = self.cal_shots // num_cal_qubits if self.cal_shots / num_cal_qubits != shots: shots += 1 self._balanced_shots = shots * num_cal_qubits # Independent else: trans_qcs = [] for kk in qubits: trans_qcs.extend( _tensor_meas_states( kk, self.num_qubits, initial_reset=initial_reset ) ) num_circs = len(trans_qcs) max_circuits = self.system_info["max_circuits"] # Determine the number of jobs required num_jobs = ceil(num_circs / max_circuits) logger.info( "Generated %s circuits, which will run in %s jobs using %s shots", num_circs, num_jobs, shots, ) # Get the slice length circ_slice = ceil(num_circs / num_jobs) circs_list = [ trans_qcs[kk * circ_slice : (kk + 1) * circ_slice] for kk in range(num_jobs - 1) ] + [trans_qcs[(num_jobs - 1) * circ_slice :]] # Do job submission here jobs = [] for circs in circs_list: _job = self.system.run( circs, shots=shots, rep_delay=self.rep_delay, job_tags=["M3 calibration"], ) jobs.append(_job) # Execute job and cal building in new thread. self._job_error = None if async_cal: thread = threading.Thread( target=_job_thread, args=(jobs, self, qubits, num_cal_qubits, cal_strings), ) self._thread = thread self._thread.start() else: _job_thread(jobs, self, qubits, num_cal_qubits, cal_strings) return jobs
[docs] def apply_correction( self, counts, qubits, distance=None, method="auto", max_iter=25, tol=1e-4, return_mitigation_overhead=False, details=False, ): """Applies correction to given counts. Parameters: counts (dict, list): Input counts dict or list of dicts. qubits (dict, array_like): Qubits on which measurements applied. distance (int): Distance to correct for. Default=num_bits method (str): Solution method: 'auto', 'direct' or 'iterative'. max_iter (int): Max. number of iterations, Default=25. tol (float): Convergence tolerance of iterative method, Default=1e-4. return_mitigation_overhead (bool): Returns the mitigation overhead, default=False. details (bool): Return extra info, default=False. Returns: QuasiDistribution or QuasiCollection: Dictionary of quasiprobabilities if input is a single dict, else a collection of quasiprobabilities. Raises: M3Error: Bitstring length does not match number of qubits given. """ logger.info("Apply correction to %s bitstrings", len(counts)) if len(counts) == 0: raise M3Error("Input counts is any empty dict.") given_list = False if isinstance(counts, (list, np.ndarray)): given_list = True if not given_list: counts = [counts] if isinstance(qubits, dict): # If a mapping was given for qubits qubits = [list(qubits.values())] elif not any(isinstance(qq, (list, tuple, np.ndarray, dict)) for qq in qubits): qubits = [qubits] * len(counts) else: if isinstance(qubits[0], dict): # assuming passed a list of mappings qubits = [list(qu.values()) for qu in qubits] if len(qubits) != len(counts): raise M3Error("Length of counts does not match length of qubits.") # Check if using faulty qubits bad_qubits = set() for item in qubits: for qu in item: if qu in self.faulty_qubits: bad_qubits.add(qu) if any(bad_qubits): warnings.warn("Using faulty qubits: {}".format(bad_qubits)) quasi_out = [] details_out = [] log_iter = max(len(counts) // 20, 1) logger.info("Start applying correction using method %s", method) for idx, cnts in enumerate(counts): if logger.isEnabledFor(logging.DEBUG): if idx % log_iter == 0: logger.debug("Applying correction %s/%s", idx, len(counts)) st = perf_counter() corrected = self._apply_correction( cnts, qubits=qubits[idx], distance=distance, method=method, max_iter=max_iter, tol=tol, return_mitigation_overhead=return_mitigation_overhead, details=details, ) if logger.isEnabledFor(logging.DEBUG): dur = perf_counter() - st if dur > 1: logger.warning("It look %s seconds to process %s", dur, cnts) if details: logger.debug("Correction details: %s", corrected[1]) if details: quasi_out.append(corrected[0]) details_out.append(corrected[1]) else: quasi_out.append(corrected) logger.info("All done applying correction") if not given_list: if details: return quasi_out[0], details_out[0] return quasi_out[0] quasi_out = QuasiCollection(quasi_out) if details: return quasi_out, details_out return quasi_out
def _apply_correction( self, counts, qubits, distance=None, method="auto", max_iter=25, tol=1e-3, return_mitigation_overhead=False, details=False, ): """Applies correction to given counts. Parameters: counts (dict): Input counts dict. qubits (array_like): Qubits on which measurements applied. distance (int): Distance to correct for. Default=num_bits method (str): Solution method: 'auto', 'direct' or 'iterative'. max_iter (int): Max. number of iterations, Default=25. tol (float): Convergence tolerance of iterative method, Default=1e-3. return_mitigation_overhead (bool): Returns the mitigation overhead, default=False. details (bool): Return extra info, default=False. Returns: QuasiDistribution: Dictionary of quasiprobabilities. Raises: M3Error: Bitstring length does not match number of qubits given. """ # This is needed because counts is a Counts object in Qiskit not a dict. counts = dict(counts) shots = sum(counts.values()) # If distance is None, then assume min(num_bits, 3). num_bits = len(qubits) num_elems = len(counts) if distance is None: distance = min(num_bits, 3) elif distance == -1: # shortcut for setting max distance distance = num_bits # check if len of bitstrings does not equal number of qubits passed. bitstring_len = len(next(iter(counts))) if bitstring_len != num_bits: raise M3Error( "Bitstring length ({}) does not match".format(bitstring_len) + " number of qubits ({})".format(num_bits) ) # Check if no cals done yet if self.single_qubit_cals is None: warnings.warn("No calibration data. Calibrating: {}".format(qubits)) self._grab_additional_cals(qubits, method=self.cal_method) # Check if one or more new qubits need to be calibrated. missing_qubits = [qq for qq in qubits if self.single_qubit_cals[qq] is None] if any(missing_qubits): warnings.warn( "Computing missing calibrations for qubits: {}".format(missing_qubits) ) self._grab_additional_cals(missing_qubits, method=self.cal_method) if method == "auto": current_free_mem = psutil.virtual_memory().available / 1024**3 # First check if direct method can be run if num_elems <= self.iter_threshold and ( (num_elems**2 + num_elems) * 8 / 1024**3 < current_free_mem / 2 ): method = "direct" else: method = "iterative" if method == "direct": st = perf_counter() mit_counts, col_norms, gamma = direct_solve( self, counts, qubits, distance, return_mitigation_overhead ) dur = perf_counter() - st mit_counts.shots = shots if gamma is not None: mit_counts.mitigation_overhead = gamma * gamma if details: info = {"method": "direct", "time": dur, "dimension": num_elems} info["col_norms"] = col_norms return mit_counts, info return mit_counts elif method == "iterative": iter_count = np.zeros(1, dtype=int) def callback(_): iter_count[0] += 1 if details: st = perf_counter() mit_counts, col_norms, gamma = iterative_solver( self, counts, qubits, distance, tol, max_iter, 1, callback, return_mitigation_overhead, ) dur = perf_counter() - st mit_counts.shots = shots if gamma is not None: mit_counts.mitigation_overhead = gamma * gamma info = {"method": "iterative", "time": dur, "dimension": num_elems} info["iterations"] = iter_count[0] info["col_norms"] = col_norms return mit_counts, info # pylint: disable=unbalanced-tuple-unpacking mit_counts, gamma = iterative_solver( self, counts, qubits, distance, tol, max_iter, 0, callback, return_mitigation_overhead, ) logger.info(f"Number of GMRES iterations: {iter_count[0]}") mit_counts.shots = shots if gamma is not None: mit_counts.mitigation_overhead = gamma * gamma return mit_counts else: raise M3Error("Invalid method: {}".format(method))
[docs] def reduced_cal_matrix(self, counts, qubits, distance=None): """Return the reduced calibration matrix used in the solution. Parameters: counts (dict): Input counts dict. qubits (array_like): Qubits on which measurements applied. distance (int): Distance to correct for. Default=num_bits Returns: ndarray: 2D array of reduced calibrations. dict: Counts in order they are displayed in matrix. Raises: M3Error: If bit-string length does not match passed number of qubits. """ return cal_matrix(self, counts, qubits, distance)
[docs] def readout_fidelity(self, qubits=None): """Compute readout fidelity for calibrated qubits. Parameters: qubits (array_like): Qubits to compute over, default is all. Returns: list: List of qubit fidelities. Raises: M3Error: Mitigator is not calibrated. M3Error: Qubit indices out of range. """ if self.single_qubit_cals is None: raise M3Error("Mitigator is not calibrated") if qubits is None: qubits = range(self.num_qubits) else: outliers = [kk for kk in qubits if kk >= self.num_qubits] if any(outliers): raise M3Error( "One or more qubit indices out of range: {}".format(outliers) ) fids = [] for kk in qubits: qubit = self.single_qubit_cals[kk] if qubit is not None: fids.append(np.mean(qubit.diagonal())) else: fids.append(None) return fids
def _thread_check(self): """Check if a thread is running and join it. Raise an error if one is given. """ if self._thread and self._thread != threading.current_thread(): self._thread.join() self._thread = None if self._job_error: raise self._job_error # pylint: disable=raising-bad-type
def _job_thread(jobs, mit, qubits, num_cal_qubits, cal_strings): """Run the calibration job in a different thread and post-process Parameters: jobs (list): A list of job instances mit (M3Mitigator): The mitigator instance qubits (list): List of qubits used num_cal_qubits (int): Number of calibration qubits cal_strings (list): List of cal strings for balanced cals """ counts = [] for job in jobs: try: res = job.result() # pylint: disable=broad-except except Exception as error: mit._job_error = error return else: _counts = res.get_counts() # _counts can be a list or a dict (if only one circuit was executed within the job) if isinstance(_counts, list): counts.extend(_counts) else: counts.append(_counts) # attach timestamp if hasattr(job, "metrics"): timestamp = job.metrics()["timestamps"]["running"] else: timestamp = None logger.info("All jobs are done.") # Timestamp can be None if timestamp is None: timestamp = datetime.datetime.now() # Needed since Aer result date is str but IBMQ job is datetime if isinstance(timestamp, datetime.datetime): timestamp = timestamp.isoformat() # Go to UTC times because we are going to use this for # resultsDB storage as well try: dt = datetime.datetime.fromisoformat(timestamp) except ValueError: dt = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") dt_utc = dt.astimezone(datetime.timezone.utc) mit.cal_timestamp = dt_utc.isoformat() # A list of qubits with bad meas cals bad_list = [] if mit.cal_method == "independent": for idx, qubit in enumerate(qubits): mit.single_qubit_cals[qubit] = np.zeros((2, 2), dtype=np.float32) # Counts 0 has all P00, P10 data, so do that here prep0_counts = counts[2 * idx] P10 = prep0_counts.get("1", 0) / mit.cal_shots P00 = 1 - P10 mit.single_qubit_cals[qubit][:, 0] = [P00, P10] # plus 1 here since zeros data at pos=0 prep1_counts = counts[2 * idx + 1] P01 = prep1_counts.get("0", 0) / mit.cal_shots P11 = 1 - P01 mit.single_qubit_cals[qubit][:, 1] = [P01, P11] if P01 >= P00: bad_list.append(qubit) elif mit.cal_method == "marginal": prep0_counts = counts[0] prep1_counts = counts[1] for idx, qubit in enumerate(qubits): mit.single_qubit_cals[qubit] = np.zeros((2, 2), dtype=np.float32) count_vals = 0 index = num_cal_qubits - idx - 1 for key, val in prep0_counts.items(): if key[index] == "0": count_vals += val P00 = count_vals / mit.cal_shots P10 = 1 - P00 mit.single_qubit_cals[qubit][:, 0] = [P00, P10] count_vals = 0 for key, val in prep1_counts.items(): if key[index] == "1": count_vals += val P11 = count_vals / mit.cal_shots P01 = 1 - P11 mit.single_qubit_cals[qubit][:, 1] = [P01, P11] if P01 >= P00: bad_list.append(qubit) # balanced calibration else: cals = [np.zeros((2, 2), dtype=np.float32) for kk in range(num_cal_qubits)] for idx, count in enumerate(counts): target = cal_strings[idx][::-1] good_prep = np.zeros(num_cal_qubits, dtype=np.float32) # divide by 2 since total shots is double denom = mit._balanced_shots for key, val in count.items(): key = key[::-1] for kk in range(num_cal_qubits): if key[kk] == target[kk]: good_prep[kk] += val for kk, cal in enumerate(cals): if target[kk] == "0": cal[0, 0] += good_prep[kk] / denom else: cal[1, 1] += good_prep[kk] / denom for cal in cals: cal[1, 0] = 1.0 - cal[0, 0] cal[0, 1] = 1.0 - cal[1, 1] for idx, cal in enumerate(cals): mit.single_qubit_cals[qubits[idx]] = cal # save cals to file, if requested if mit.cals_file: mit.cals_to_file(mit.cals_file) # faulty qubits, if any mit.faulty_qubits = _faulty_qubit_checker(mit.single_qubit_cals) def _faulty_qubit_checker(cals): """Find faulty qubits in cals Parameters: cals (list): Input list of calibrations Returns: list: Faulty qubits """ faulty_qubits = [] for idx, cal in enumerate(cals): if cal is not None: if cal[0, 1] >= cal[0, 0]: faulty_qubits.append(idx) return faulty_qubits