import warnings
from typing import Optional
import numpy as np
from qemcmc.circuits import CircuitMaker
from qemcmc.coarse_grain import CoarseGraining
from qemcmc.model import EnergyModel
from qemcmc.sampler import Proposal
warnings.filterwarnings("ignore", category=RuntimeWarning)
[docs]
class QeProposal(Proposal):
"""
Quantum-enhanced Markov Chain Monte Carlo sampler.
This class implements the proposal mechanism of the quantum-enhanced
MCMC algorithm. Candidate states are generated by simulating the
quantum time evolution of a transverse-field Hamiltonian.
The quantum proposal circuit is constructed using :class:`CircuitMaker`
and can optionally operate on coarse-grained subgroups of spins to
improve scalability.
Parameters
----------
model : EnergyModel
Energy model defining the target Boltzmann distribution.
gamma : float | tuple[float, float]
Transverse field strength (Γ). If a tuple is provided, a value is
sampled uniformly from the range [min, max] at each step.
time : float | tuple[float, float]
Total evolution time. If a tuple is provided, a value is sampled
uniformly from the range [min, max] at each step.
r : int | None, optional
Number of Trotter steps. Mutually exclusive with ``delta_t``.
If provided, the Trotter step size is derived as Δt = t / r and
``r`` stays fixed while Δt varies with ``t``. A warning is issued
at init if the resulting Δt range falls outside [0.1, 2.0].
By default ``None``.
delta_t : float | None, optional
Trotter step size. Mutually exclusive with ``r``.
If provided, the number of Trotter steps is derived as
r = floor(t / Δt) per step, so ``r`` varies with ``t`` while
Δt stays fixed. A warning is issued if Δt falls outside
[0.1, 2.0]. By default ``None``.
coarse_graining : CoarseGraining | None, optional
A coarse-graining scheme to define spin subgroups. If None, no coarse-graining
is used. By default ``None``.
coupling_weights : list[float | tuple[float, float]] | None, optional
Weights for the coupling tensors. If a tuple is provided, a weight is
sampled uniformly from the range. This allows for dynamic adjustment
of the influence of different terms in the Hamiltonian during the proposal
step. By default ``None``.
Note that this is further adjusted by (1 - gamma) to balance the influence of the
problem Hamiltonian with the mixer term. Divide by (1 - gamma) if needed.
Also, note that the coupling weights should include the mixing term.
m : int, optional
Number of subgroups to partition the spins into for sequential updates.
By default ``1``.
Notes
-----
The proposal step simulates the time evolution under a transverse-field Hamiltonian defined by the energy model
and measures the resulting state to produce a candidate configuration.
This proposal is then accepted or rejected using the Metropolis criterion
to ensure convergence to the target Boltzmann distribution.
Exactly one of ``r`` or ``delta_t`` should be passed in as an argument. If neither is
given, a default Δt of 0.8 is used.
"""
def __init__(
self,
model: EnergyModel,
gamma: float | tuple[float, float],
time: float | tuple[float, float],
r: Optional[int] = None,
delta_t: Optional[float] = None,
coarse_graining: Optional[CoarseGraining] = None,
coupling_weights: Optional[list[float | tuple[float, float]]] = None,
m: int = 1,
):
super().__init__(model)
if coupling_weights is not None:
if len(coupling_weights) != len(model.normalised_couplings):
raise ValueError(
f"Length of coupling_weights ({len(coupling_weights)}) must match the number of couplings in the model ({len(model.normalised_couplings)}), including constraint terms."
)
self.coupling_weights = coupling_weights
else:
self.coupling_weights = list(np.ones(len(model.normalised_couplings)))
[docs]
self.gamma = self._validate_gamma(gamma)
[docs]
self.time = self._validate_time(time)
[docs]
self.r = self._validate_r(r)
[docs]
self.delta_t = self._validate_delta_t(delta_t)
self._validate_trotter_params()
[docs]
self.method = "quantum"
[docs]
self.CM = CircuitMaker(self.model)
[docs]
self.cg = coarse_graining or CoarseGraining(model.n_spins)
[docs]
def update(self, current_state: str) -> str:
"""
Perform 'm' sequential quantum updates across non-overlapping subgroups.
Parameters
----------
current_state : str
Current state of the system as a bitstring.
Returns
-------
str
Updated state of the system as a bitstring.
"""
if not isinstance(current_state, str):
raise TypeError(f"Bitstring must be of type str, got {type(current_state)}: {current_state!r}")
# Generate m disjoint partitions of the spins
partitions = self.cg.get_partitions(m=self.m)
final_coupling_weights, g, t, r = self.sample_hyperparams()
working_state = current_state
for subgroup in partitions:
# Recalculate local couplings as spins outside the subgroup may have flipped
local_couplings = self.model.get_subgroup_couplings(subgroup=subgroup, current_state=working_state, coupling_weights=final_coupling_weights)
working_state = self.CM.update(s=working_state, subgroup_choice=subgroup, local_couplings=local_couplings, gamma=g, time=t, r=r)
return working_state
[docs]
def sample_hyperparams(self) -> tuple[list[float], float, float, int]:
"""
Sample hyperparameters (coupling weights, gamma, time, r) for the current proposal step.
Returns
-------
tuple[list[float], float, float, int]
(final_coupling_weights, gamma, time, r) for this proposal step.
"""
# Sample gamma and time once per full proposal step
g = self.gamma if not isinstance(self.gamma, tuple) else np.random.uniform(*self.gamma)
t = self.time if not isinstance(self.time, tuple) else np.random.uniform(*self.time)
# Determine r (number of Trotter steps) for this step
if self.r is not None:
# Fixed r mode: r stays constant, Δt varies with t
r = self.r
else:
# Fixed Δt mode (explicit or default): r varies with t
dt = self.delta_t if self.delta_t is not None else DEFAULT_DELTA_T
r = max(1, int(np.floor(t / dt)))
# Pre-sample weights for all unique coupling weight settings (tuples or floats)
unique_items = sorted(list(set(self.coupling_weights)), key=lambda x: str(x))
unique_index = [[i for i, x in enumerate(self.coupling_weights) if x == item] for item in unique_items]
sampled_weights = np.ones(len(unique_items), dtype=float)
for i, item in enumerate(unique_items):
if isinstance(item, (int, float)):
sampled_weights[i] = item
elif isinstance(item, tuple):
sampled_weights[i] = np.random.uniform(*item)
# Adjust the sampled weights by (1 - gamma) to balance their influence with the mixer term in the Hamiltonian
sampled_weights_adjusted_by_gamma = (1 - g) * sampled_weights # Adjust weights by (1 - gamma) to balance with the mixer term
# Construct the final list of coupling weights for this update step
final_coupling_weights = np.ones(len(self.coupling_weights))
for i, indices in enumerate(unique_index):
for index in indices:
final_coupling_weights[index] = sampled_weights_adjusted_by_gamma[i]
return final_coupling_weights, g, t, r
[docs]
def _validate_gamma(self, gamma: float | tuple[float, float]) -> float | tuple[float, float]:
"""
Validate the weight parameter gamma.
"""
if isinstance(gamma, (float, int)):
if not (0.0 <= gamma <= 1.0):
raise ValueError(f"gamma must be in [0, 1], got {gamma}")
return float(gamma)
if isinstance(gamma, (tuple, list)):
if len(gamma) != 2:
raise ValueError(f"gamma tuple or list must be (min, max), got {gamma}")
g_min, g_max = gamma
if not (0.0 <= g_min <= g_max <= 1.0):
raise ValueError(f"gamma range must satisfy 0 <= min <= max <= 1, got {gamma}")
return (float(g_min), float(g_max))
raise TypeError(f"gamma must be a float or tuple[float, float] or list[float, float] got {type(gamma)}")
[docs]
def _validate_time(self, time: float | tuple[float, float]) -> float | tuple[float, float]:
"""
Validate the total evolution time parameter.
"""
if isinstance(time, (int, float)):
if time <= 0:
raise ValueError(f"time must be positive, got {time}")
return float(time)
if isinstance(time, (tuple, list)):
if len(time) != 2:
raise ValueError(f"time tuple must be (min, max), got {time}")
t_min, t_max = time
if t_min <= 0 or t_max <= 0:
raise ValueError(f"time values must be positive, got {time}")
if t_min > t_max:
raise ValueError(f"time range must satisfy min <= max, got {time}")
return (float(t_min), float(t_max))
raise TypeError(f"time must be a float or tuple[float, float], got {type(time)}")
[docs]
def _validate_r(self, r: Optional[int]) -> Optional[int]:
"""
Validate the number of Trotter steps 'r'.
"""
if r is None:
return None
if not isinstance(r, int):
raise TypeError(f"r must be an int or None, got {type(r)}")
if r <= 0:
raise ValueError(f"r must be a positive integer, got {r}")
return r
[docs]
def _validate_delta_t(self, delta_t: Optional[float]) -> Optional[float]:
"""
Validate the Trotter step size 'delta_t'.
"""
if delta_t is None:
return None
if not isinstance(delta_t, (int, float)):
raise TypeError(f"delta_t must be a float or None, got {type(delta_t)}")
if delta_t <= 0:
raise ValueError(f"delta_t must be positive, got {delta_t}")
return float(delta_t)
[docs]
def _validate_trotter_params(self):
"""
Check mutual exclusivity of r and delta_t, and warn
if resulting Trotter step size falls outside [0.1, 2.0].
"""
if self.r is not None and self.delta_t is not None:
raise ValueError("Only one of 'r' (number of Trotter steps) or 'delta_t' (Trotter step size) should be provided, not both.")
if self.r is not None:
# Fixed r: warn if Δt = t/r is outside [0.1, 2.0] for any t in the range
if isinstance(self.time, tuple):
dt_min = self.time[0] / self.r
dt_max = self.time[1] / self.r
if dt_min < 0.1 or dt_max > 2.0:
warnings.warn(
f"With r={self.r} and time=({self.time[0]}, {self.time[1]}), "
f"the Trotter step size Δt ranges from {dt_min:.4f} to {dt_max:.4f}, "
f"which falls outside the recommended range [0.1, 2.0].",
stacklevel=3,
)
else:
dt = self.time / self.r
if dt < 0.1 or dt > 2.0:
warnings.warn(
f"Trotter step size Δt = t/r = {self.time}/{self.r} = {dt:.4f} is outside the recommended range [0.1, 2.0].",
stacklevel=3,
)
if self.delta_t is not None:
if self.delta_t < 0.1 or self.delta_t > 2.0:
warnings.warn(
f"Trotter step size Δt = {self.delta_t} is outside the recommended range [0.1, 2.0].",
stacklevel=3,
)