"""
Provides a class for representing Gaussian measures on Hilbert spaces.
This module generalizes the concept of a multivariate normal distribution to
the setting of abstract Hilbert spaces. A `GaussianMeasure` is defined by its
expectation (a vector in the space) and its covariance (a self-adjoint,
positive semi-definite `LinearOperator`).
This abstraction is fundamental for Bayesian inference, Gaussian processes, and
data assimilation in function spaces.
Key Features
------------
- Multiple factory methods for creating measures from various inputs (matrices,
samples, standard deviations).
- A method for drawing random samples from the measure.
- Implementation of the affine transformation rule (`y = A(x) + b`).
- Support for creating low-rank approximations of the measure for efficiency.
- Overloaded arithmetic operators for intuitive combination of measures.
"""
from __future__ import annotations
from typing import Callable, Optional, Any, List, TYPE_CHECKING, Tuple, Literal
import warnings
import numpy as np
from scipy.linalg import cholesky, cho_solve
from scipy.linalg import eigh
from scipy.sparse import diags
from scipy.stats import chi2, multivariate_normal
import scipy.sparse as sps
import scipy.sparse.linalg as spla
from joblib import Parallel, delayed
from .hilbert_space import (
EuclideanSpace,
HilbertModule,
MassWeightedHilbertModule,
MassWeightedHilbertSpace,
Vector,
)
from .linear_operators import (
LinearOperator,
DiagonalSparseMatrixLinearOperator,
SparseMatrixLinearOperator,
)
from .linear_solvers import IterativeLinearSolver, LinearSolver
from .affine_operators import AffineOperator
from .direct_sum import BlockDiagonalLinearOperator, BlockLinearOperator
if TYPE_CHECKING:
from .hilbert_space import HilbertSpace
[docs]
class GaussianMeasure:
"""
Represents a Gaussian measure on a Hilbert space.
This class generalizes the multivariate normal distribution to abstract,
potentially infinite-dimensional, Hilbert spaces. A measure is
defined by its expectation (mean vector) and its covariance, which is a
`LinearOperator` on the space.
"""
def __init__(
self,
/,
*,
covariance: LinearOperator = None,
covariance_factor: LinearOperator = None,
expectation: Vector = None,
sample: Callable[[], Vector] = None,
inverse_covariance: LinearOperator = None,
inverse_covariance_factor: LinearOperator = None,
) -> None:
"""
Initializes the GaussianMeasure.
The measure can be defined in several ways, primarily by providing
either a covariance operator or a covariance factor.
Args:
covariance: A self-adjoint and positive semi-definite linear
operator on the domain.
covariance_factor: A linear operator L such that the covariance
C = L @ L*.
expectation: The expectation (mean) of the measure. Defaults to
the zero vector of the space (stored internally as None).
sample: A function that returns a random sample from the measure.
If a `covariance_factor` is given, a default sampler is created.
inverse_covariance: The inverse of the covariance operator (the
precision operator).
inverse_covariance_factor: A factor Li of the inverse covariance,
such that C_inv = Li.T @ Li.
Raises:
ValueError: If neither `covariance` nor `covariance_factor` is provided.
"""
if covariance is None and covariance_factor is None:
raise ValueError(
"Neither covariance nor covariance_factor has been provided."
)
self._covariance_factor: Optional[LinearOperator] = covariance_factor
self._covariance: LinearOperator = (
covariance_factor @ covariance_factor.adjoint
if covariance is None
else covariance
)
self._domain: HilbertSpace = self._covariance.domain
self._sample: Optional[Callable[[], Vector]] = (
sample if covariance_factor is None else self._sample_from_factor
)
self._inverse_covariance_factor: Optional[LinearOperator] = (
inverse_covariance_factor
)
if inverse_covariance_factor is not None:
self._inverse_covariance: Optional[LinearOperator] = (
inverse_covariance_factor.adjoint @ inverse_covariance_factor
)
elif inverse_covariance is not None:
self._inverse_covariance = inverse_covariance
else:
self._inverse_covariance = None
self._expectation: Optional[Vector] = expectation
# ---------------------------------------- #
# Constructors #
# ---------------------------------------- #
[docs]
@staticmethod
def from_standard_deviation(
domain: HilbertSpace,
standard_deviation: float,
/,
*,
expectation: Vector = None,
) -> GaussianMeasure:
"""
Creates an isotropic Gaussian measure with a scaled identity covariance.
Args:
domain: The Hilbert space on which the measure is defined.
standard_deviation: The uniform standard deviation for all dimensions.
expectation: The mean vector. Defaults to the zero vector.
Returns:
A new GaussianMeasure instance.
"""
covariance_factor = standard_deviation * domain.identity_operator()
inverse_covariance_factor = (
1 / standard_deviation
) * domain.identity_operator()
return GaussianMeasure(
covariance_factor=covariance_factor,
inverse_covariance_factor=inverse_covariance_factor,
expectation=expectation,
)
[docs]
@staticmethod
def from_standard_deviations(
domain: HilbertSpace,
standard_deviations: np.ndarray,
/,
*,
expectation: Vector = None,
) -> GaussianMeasure:
"""
Creates a Gaussian measure with a diagonal covariance operator.
Args:
domain: The Hilbert space on which the measure is defined.
standard_deviations: A 1D NumPy array representing the diagonal
entries of the covariance factor.
expectation: The mean vector. Defaults to the zero vector.
Returns:
A new GaussianMeasure instance.
Raises:
ValueError: If the size of the array does not match the space dimension.
"""
if standard_deviations.size != domain.dim:
raise ValueError(
"Standard deviation vector does not have the correct length."
)
euclidean = EuclideanSpace(domain.dim)
covariance_factor = DiagonalSparseMatrixLinearOperator.from_diagonal_values(
euclidean, domain, standard_deviations
)
return GaussianMeasure(
covariance_factor=covariance_factor,
inverse_covariance_factor=covariance_factor.inverse,
expectation=expectation,
)
[docs]
@staticmethod
def from_covariance_matrix(
domain: HilbertSpace,
covariance_matrix: np.ndarray,
/,
*,
expectation: Vector = None,
rtol: float = 1e-10,
) -> GaussianMeasure:
"""
Creates a Gaussian measure from a dense covariance matrix.
Args:
domain: The Hilbert space on which the measure is defined.
covariance_matrix: A 2D symmetric positive semi-definite NumPy array.
expectation: The mean vector. Defaults to the zero vector.
rtol: Relative tolerance for clipping small negative eigenvalues
caused by floating-point inaccuracies.
Returns:
A new GaussianMeasure instance.
Raises:
ValueError: If the matrix has significantly negative eigenvalues.
"""
eigenvalues, U = eigh(covariance_matrix)
if np.any(eigenvalues < 0):
max_eig = np.max(np.abs(eigenvalues))
min_eig = np.min(eigenvalues)
if min_eig < -rtol * max_eig:
raise ValueError(
"Covariance matrix has significantly negative eigenvalues, "
"indicating it is not positive semi-definite."
)
else:
warnings.warn(
"Covariance matrix has small negative eigenvalues due to "
"numerical error. Clipping them to zero.",
UserWarning,
)
eigenvalues[eigenvalues < 0] = 0
values = np.sqrt(eigenvalues)
D = diags([values], [0])
out_array = np.zeros_like(values, dtype=float)
Di = diags([np.reciprocal(values, out=out_array, where=(values != 0))], [0])
L = U @ D
Li = Di @ U.T
covariance_factor = LinearOperator.from_matrix(
EuclideanSpace(domain.dim), domain, L, galerkin=True
)
inverse_covariance_factor = LinearOperator.from_matrix(
domain, EuclideanSpace(domain.dim), Li, galerkin=False
)
return GaussianMeasure(
covariance_factor=covariance_factor,
inverse_covariance_factor=inverse_covariance_factor,
expectation=expectation,
)
[docs]
@staticmethod
def from_samples(domain: HilbertSpace, samples: List[Vector], /) -> GaussianMeasure:
"""
Estimates a Gaussian measure from a collection of sample vectors.
Constructs an empirical mean and an unnormalized sample covariance operator
using a tensor product expansion.
Args:
domain: The Hilbert space the samples belong to.
samples: A list of sample vectors.
Returns:
A new GaussianMeasure instance.
Raises:
ValueError: If the list of samples is empty.
"""
assert all([domain.is_element(x) for x in samples])
n = len(samples)
if n == 0:
raise ValueError("Cannot estimate measure from zero samples.")
expectation = domain.sample_expectation(samples)
if n == 1:
covariance = domain.zero_operator()
def sample() -> Vector:
return expectation
else:
offsets = [domain.subtract(x, expectation) for x in samples]
covariance = LinearOperator.self_adjoint_from_tensor_product(
domain, offsets
) / (n - 1)
def sample() -> Vector:
x = domain.copy(expectation)
randoms = np.random.randn(len(offsets))
for y, r in zip(offsets, randoms):
domain.axpy(r / np.sqrt(n - 1), y, x)
return x
return GaussianMeasure(
covariance=covariance, expectation=expectation, sample=sample
)
[docs]
@staticmethod
def from_direct_sum(measures: List[GaussianMeasure], /) -> GaussianMeasure:
"""
Constructs a product measure from a list of other measures.
The resulting measure resides on the direct sum of the input domains,
with block-diagonal covariance and concatenated expectations.
Args:
measures: A list of GaussianMeasure instances.
Returns:
A new GaussianMeasure instance defined on the direct sum space.
"""
if all(measure.has_zero_expectation for measure in measures):
expectation = None
else:
expectation = [measure.expectation for measure in measures]
covariance = BlockDiagonalLinearOperator(
[measure.covariance for measure in measures]
)
inverse_covariance = (
BlockDiagonalLinearOperator(
[measure.inverse_covariance for measure in measures]
)
if all(measure.inverse_covariance_set for measure in measures)
else None
)
def sample_impl() -> List[Vector]:
return [measure.sample() for measure in measures]
sample = (
sample_impl if all(measure.sample_set for measure in measures) else None
)
return GaussianMeasure(
covariance=covariance,
expectation=expectation,
sample=sample,
inverse_covariance=inverse_covariance,
)
# ---------------------------------------- #
# Properties #
# ---------------------------------------- #
@property
def domain(self) -> HilbertSpace:
"""The Hilbert space the measure is defined on."""
return self._domain
@property
def covariance(self) -> LinearOperator:
"""The covariance operator of the measure."""
return self._covariance
@property
def inverse_covariance_set(self) -> bool:
"""True if the inverse covariance (precision) operator is available."""
return self._inverse_covariance is not None
@property
def inverse_covariance(self) -> LinearOperator:
"""The inverse covariance (precision) operator. Raises AttributeError if not set."""
if self._inverse_covariance is None:
raise AttributeError("Inverse covariance is not set for this measure.")
return self._inverse_covariance
@property
def covariance_factor_set(self) -> bool:
"""True if a covariance factor L (such that C = L @ L*) is available."""
return self._covariance_factor is not None
@property
def covariance_factor(self) -> LinearOperator:
"""The covariance factor L. Raises AttributeError if not set."""
if self._covariance_factor is None:
raise AttributeError("Covariance factor has not been set.")
return self._covariance_factor
@property
def inverse_covariance_factor_set(self) -> bool:
"""True if an inverse covariance factor is available."""
return self._inverse_covariance_factor is not None
@property
def inverse_covariance_factor(self) -> LinearOperator:
"""The inverse covariance factor. Raises AttributeError if not set."""
if self._inverse_covariance_factor is None:
raise AttributeError("Inverse covariance factor has not been set.")
return self._inverse_covariance_factor
@property
def has_zero_expectation(self) -> bool:
"""True if the measure is internally stored with an exactly zero expectation."""
return self._expectation is None
@property
def expectation(self) -> Vector:
"""The expectation (mean) vector of the measure."""
if self.has_zero_expectation:
return self.domain.zero
return self._expectation
@property
def sample_set(self) -> bool:
"""True if a method for drawing random samples is available."""
return self._sample is not None
# ---------------------------------------- #
# Public methods #
# ---------------------------------------- #
[docs]
def sample(self) -> Vector:
"""
Returns a single random sample drawn from the measure.
Returns:
A randomly sampled vector.
Raises:
NotImplementedError: If a sample method is not set for this measure.
"""
if self._sample is None:
raise NotImplementedError("A sample method is not set for this measure.")
return self._sample()
[docs]
def samples(
self, n: int, /, *, parallel: bool = False, n_jobs: int = -1
) -> List[Vector]:
"""
Returns a list of `n` independent random samples from the measure.
Args:
n: The number of samples to draw.
parallel: If True, draws samples concurrently.
n_jobs: The number of CPU cores to use if parallel=True.
Returns:
A list of sampled vectors.
Raises:
ValueError: If `n` is less than 1.
"""
if n < 1:
raise ValueError("Number of samples must be a positive integer.")
if not parallel:
return [self.sample() for _ in range(n)]
return Parallel(n_jobs=n_jobs)(delayed(self.sample)() for _ in range(n))
[docs]
def sample_expectation(
self, n: int, /, *, parallel: bool = False, n_jobs: int = -1
) -> Vector:
"""
Estimates the expectation vector by drawing `n` Monte Carlo samples.
Args:
n: The number of samples to use for the estimation.
parallel: If True, draws samples concurrently.
n_jobs: The number of CPU cores to use if parallel=True.
Returns:
The empirical expectation vector.
"""
if n < 1:
raise ValueError("Number of samples must be a positive integer.")
return self.domain.sample_expectation(
self.samples(n, parallel=parallel, n_jobs=n_jobs)
)
[docs]
def sample_pointwise_variance(
self, n: int, /, *, parallel: bool = False, n_jobs: int = -1
) -> Vector:
"""
Estimates the pointwise variance field by drawing `n` Monte Carlo samples.
Args:
n: The number of samples to use.
parallel: If True, draws samples concurrently.
n_jobs: The number of CPU cores to use if parallel=True.
Returns:
A vector representing the pointwise variance field.
Raises:
NotImplementedError: If the domain is not a HilbertModule (which
provides pointwise multiplication).
"""
if not isinstance(self.domain, HilbertModule):
raise NotImplementedError(
"Pointwise variance requires vector multiplication on the domain."
)
if n < 1:
raise ValueError("Number of samples must be a positive integer.")
samples = self.samples(n, parallel=parallel, n_jobs=n_jobs)
variance = self.domain.zero
if self.has_zero_expectation:
for sample in samples:
prod = self.domain.vector_multiply(sample, sample)
self.domain.axpy(1 / n, prod, variance)
else:
expectation = self.expectation
for sample in samples:
diff = self.domain.subtract(sample, expectation)
prod = self.domain.vector_multiply(diff, diff)
self.domain.axpy(1 / n, prod, variance)
return variance
[docs]
def sample_pointwise_std(
self, n: int, /, *, parallel: bool = False, n_jobs: int = -1
) -> Vector:
"""
Estimates the pointwise standard deviation field by drawing `n` Monte Carlo samples.
Args:
n: The number of samples to use.
parallel: If True, draws samples concurrently.
n_jobs: The number of CPU cores to use if parallel=True.
Returns:
A vector representing the pointwise standard deviation field.
"""
variance = self.sample_pointwise_variance(n, parallel=parallel, n_jobs=n_jobs)
return self.domain.vector_sqrt(variance)
[docs]
def deflated_pointwise_variance(
self,
rank: int,
/,
*,
size_estimate: int = 0,
method: str = "variable",
max_samples: int = None,
rtol: float = 1e-2,
block_size: int = 10,
parallel: bool = False,
n_jobs: int = -1,
) -> Vector:
"""
Estimates the pointwise variance field using a deflated Hutchinson's method.
This combines a deterministic low-rank extraction (via SVD deflation) with
a stochastic Hutchinson trace estimator for the residual variance.
Args:
rank: The rank of the deterministic SVD deflation.
size_estimate: The initial number of stochastic residual samples.
method: 'variable' to sample until `rtol` is met, 'fixed' otherwise.
max_samples: Hard limit on stochastic residual samples.
rtol: Relative tolerance for the stochastic residual phase.
block_size: Number of samples added per check in the 'variable' method.
parallel: If True, draws stochastic samples in parallel.
n_jobs: The number of CPU cores to use.
Returns:
A vector representing the pointwise variance field.
Raises:
NotImplementedError: If the domain is not a HilbertModule.
"""
if not isinstance(self.domain, HilbertModule):
raise NotImplementedError(
"Pointwise variance requires vector multiplication on the domain."
)
if rank < 0 or size_estimate < 0:
raise ValueError("Rank and size_estimate must be non-negative.")
space = self.domain
deterministic_var = space.zero
if rank > 0:
if rank == 1:
raise ValueError("Rank must be greater than 1.")
F = self.low_rank_approximation(rank, method="fixed").covariance_factor
actual_rank = F.domain.dim
for i in range(actual_rank):
e_i = F.domain.basis_vector(i)
spatial_mode = F(e_i)
squared_mode = space.vector_multiply(spatial_mode, spatial_mode)
space.axpy(1.0, squared_mode, deterministic_var)
residual_cov = self.covariance - (F @ F.adjoint)
else:
residual_cov = self.covariance
if size_estimate == 0 and method == "fixed":
return deterministic_var
if max_samples is None:
max_samples = space.dim
num_samples = min(size_estimate, max_samples)
if hasattr(space, "squared_norms"):
inv_norms = 1.0 / np.sqrt(space.squared_norms)
else:
inv_norms = np.array(
[
1.0 / np.sqrt(space.squared_norm(space.basis_vector(i)))
for i in range(space.dim)
]
)
def _compute_sample_sum(n_samples_to_draw: int) -> Vector:
def _single_sample():
c_z = np.random.choice([-1.0, 1.0], size=space.dim) * inv_norms
z = space.from_components(c_z)
R_z = residual_cov(z)
return space.vector_multiply(z, R_z)
block_sum = space.zero
if parallel:
results = Parallel(n_jobs=n_jobs)(
delayed(_single_sample)() for _ in range(n_samples_to_draw)
)
for res in results:
space.axpy(1.0, res, block_sum)
else:
for _ in range(n_samples_to_draw):
space.axpy(1.0, _single_sample(), block_sum)
return block_sum
residual_sum = _compute_sample_sum(num_samples)
residual_est = (
space.multiply(1.0 / num_samples, residual_sum)
if num_samples > 0
else space.zero
)
if method == "variable" and num_samples < max_samples:
while num_samples < max_samples:
old_est = space.copy(residual_est)
samples_to_add = min(block_size, max_samples - num_samples)
new_sum = _compute_sample_sum(samples_to_add)
space.axpy(1.0, new_sum, residual_sum)
total_samples = num_samples + samples_to_add
residual_est = space.multiply(1.0 / total_samples, residual_sum)
norm_new = space.norm(residual_est)
if norm_new > 0:
diff = space.subtract(residual_est, old_est)
error = space.norm(diff) / norm_new
if error < rtol:
break
num_samples = total_samples
return space.add(deterministic_var, residual_est)
[docs]
def deflated_pointwise_std(
self,
rank: int,
/,
*,
size_estimate: int = 0,
method: str = "variable",
max_samples: int = None,
rtol: float = 1e-2,
block_size: int = 10,
parallel: bool = False,
n_jobs: int = -1,
) -> Vector:
"""
Estimates the pointwise standard deviation field using a deflated Hutchinson's method.
Args:
rank: The rank of the deterministic SVD deflation.
size_estimate: The initial number of stochastic residual samples.
method: 'variable' to sample until `rtol` is met, 'fixed' otherwise.
max_samples: Hard limit on stochastic residual samples.
rtol: Relative tolerance for the stochastic residual phase.
block_size: Number of samples added per check in the 'variable' method.
parallel: If True, draws stochastic samples in parallel.
n_jobs: The number of CPU cores to use.
Returns:
A vector representing the pointwise standard deviation field.
"""
if not isinstance(self.domain, HilbertModule):
raise NotImplementedError(
"Pointwise standard deviation requires vector multiplication on the domain "
"(the domain must be a HilbertModule)."
)
variance = self.deflated_pointwise_variance(
rank,
size_estimate=size_estimate,
method=method,
max_samples=max_samples,
rtol=rtol,
block_size=block_size,
parallel=parallel,
n_jobs=n_jobs,
)
return self.domain.vector_sqrt(variance)
[docs]
def credible_set(
self,
probability: float,
/,
*,
geometry: str = "ellipsoid",
rank: Optional[int] = None,
open_set: bool = False,
theta: Optional[float] = None,
spectrum=None,
spectrum_size: Optional[int] = None,
radius_method: str = "auto",
quantile_method: str = "auto",
quantile_tol: float = 1e-2,
fractional_apply: str = "auto",
n_samples: int = 10_000,
lanczos_size_estimate: int = 50,
lanczos_method: Literal["variable", "fixed"] = "fixed",
lanczos_max_k: Optional[int] = None,
lanczos_rtol: float = 1e-3,
lanczos_atol: float = 1e-8,
lanczos_check_interval: int = 5,
spectrum_low_rank_kwargs: Optional[dict] = None,
rng: Optional[np.random.Generator] = None,
):
r"""
Return a probability-calibrated Gaussian credible subset.
Five geometries are supported:
``"ellipsoid"`` / ``"mahalanobis"`` / ``"domain"``
The classical Mahalanobis ellipsoid.
``"cameron_martin"`` / ``"cm"`` / ``"ball"`` / ``"norm_ball"``
The ellipsoid expressed as a unit ball in the Cameron-Martin geometry.
``"ambient_ball"`` / ``"ambient"``
The ambient norm ball $\{m : \|m - m_0\|_H \le r_p\}$.
``"weakened_ellipsoid"`` / ``"fractional"``
The weakened-covariance ellipsoid $\{m : \|C^{-\theta/2}(m-m_0)\|_H \le r_p\}$.
Args:
probability: Credible probability $p$, strictly between 0 and 1.
geometry: Selects the ball/ellipsoid family (see above).
rank: Chi-square degrees of freedom (legacy modes only).
open_set: If true, return the open version of the set.
theta: Fractional exponent in $(0, 1)$, required for weakened_ellipsoid.
spectrum: Covariance spectrum specification.
spectrum_size: Truncation length when ``spectrum`` is callable or ``None``.
radius_method: ``"auto"``, ``"spectral"``, or ``"sampling"``.
quantile_method: Weighted-chi-square quantile method.
quantile_tol: Desired relative accuracy of the weighted-chi-square quantile.
fractional_apply: How to apply $C^{-\theta/2}$ for the weakened ellipsoid.
n_samples: Monte Carlo sample count for sampling radius.
lanczos_size_estimate: Initial or fixed Krylov dimension for Lanczos fractional evaluation.
lanczos_method: 'fixed' or 'variable' dynamic convergence for Lanczos.
lanczos_max_k: Maximum Krylov dimension if 'variable' is used.
lanczos_rtol: Relative tolerance for Lanczos convergence.
lanczos_atol: Absolute tolerance for Lanczos convergence.
lanczos_check_interval: Number of iterations between Lanczos convergence checks.
spectrum_low_rank_kwargs: Extra kwargs forwarded to ``LowRankEig.from_randomized``.
rng: Optional NumPy generator for Monte Carlo paths.
Returns:
An Ellipsoid or Ball defining the credible subset.
"""
if not 0.0 < probability < 1.0:
raise ValueError("Probability must lie strictly between 0 and 1.")
geometry_key = geometry.lower().replace("-", "_")
if geometry_key in {"ellipsoid", "domain", "mahalanobis"}:
return self._credible_set_chi2_ellipsoid(
probability, rank=rank, open_set=open_set
)
if geometry_key in {"cameron_martin", "cm", "ball", "norm_ball"}:
return self._credible_set_chi2_cameron_martin(
probability, rank=rank, open_set=open_set
)
if geometry_key in {"ambient_ball", "ambient", "h_ball"}:
return self._credible_set_ambient_ball(
probability,
open_set=open_set,
spectrum=spectrum,
spectrum_size=spectrum_size,
radius_method=radius_method,
quantile_method=quantile_method,
quantile_tol=quantile_tol,
n_samples=n_samples,
spectrum_low_rank_kwargs=spectrum_low_rank_kwargs,
rng=rng,
)
if geometry_key in {"weakened_ellipsoid", "fractional"}:
if theta is None:
raise ValueError("theta is required for geometry='weakened_ellipsoid'.")
if not 0.0 < theta < 1.0:
raise ValueError(
"theta must lie strictly in (0, 1); the boundary "
"theta=1 is the Cameron-Martin norm — use "
"geometry='cameron_martin' for the finite-rank "
"chi-square version."
)
return self._credible_set_weakened_ellipsoid(
probability,
theta=theta,
open_set=open_set,
spectrum=spectrum,
spectrum_size=spectrum_size,
radius_method=radius_method,
quantile_method=quantile_method,
quantile_tol=quantile_tol,
fractional_apply=fractional_apply,
n_samples=n_samples,
lanczos_size_estimate=lanczos_size_estimate,
lanczos_method=lanczos_method,
lanczos_max_k=lanczos_max_k,
lanczos_rtol=lanczos_rtol,
lanczos_atol=lanczos_atol,
lanczos_check_interval=lanczos_check_interval,
spectrum_low_rank_kwargs=spectrum_low_rank_kwargs,
rng=rng,
)
raise ValueError(
"Geometry must be one of 'ellipsoid', 'cameron_martin', "
"'ambient_ball', or 'weakened_ellipsoid'."
)
[docs]
def ambient_ball(self, probability: float, /, **kwargs):
"""Shortcut for ``credible_set(..., geometry='ambient_ball', ...)``."""
kwargs.setdefault("geometry", "ambient_ball")
return self.credible_set(probability, **kwargs)
[docs]
def weakened_ellipsoid(self, probability: float, /, *, theta: float, **kwargs):
"""Shortcut for the weakened-covariance ellipsoid mode."""
kwargs.setdefault("geometry", "weakened_ellipsoid")
kwargs["theta"] = theta
return self.credible_set(probability, **kwargs)
[docs]
def with_dense_covariance(
self, /, *, parallel: bool = False, n_jobs: int = -1
) -> GaussianMeasure:
"""
Forms a new Gaussian measure equivalent to the existing one, but
with its covariance matrix stored explicitly in dense form.
Args:
parallel: If True, computes the dense matrix concurrently.
n_jobs: Number of CPU cores to use if parallel=True.
Returns:
A new GaussianMeasure instance backed by a dense matrix.
"""
covariance_matrix = self.covariance.matrix(
dense=True, galerkin=True, parallel=parallel, n_jobs=n_jobs
)
measure = GaussianMeasure.from_covariance_matrix(
self.domain, covariance_matrix, expectation=self._expectation
)
measure._covariance = LinearOperator.self_adjoint_from_matrix(
self.domain, covariance_matrix
)
return measure
[docs]
def with_regularized_inverse(
self,
solver: LinearSolver,
/,
*,
damping: float = 0.0,
preconditioner: Optional[LinearOperator] = None,
) -> GaussianMeasure:
"""
Returns a new GaussianMeasure with a well-defined precision operator
(inverse covariance) computed via Tikhonov regularization.
Args:
solver: The linear solver used to invert the covariance.
damping: Tikhonov regularization parameter added to the diagonal.
preconditioner: Optional preconditioner for iterative solvers.
Returns:
A new GaussianMeasure instance equipped with an inverse covariance.
Raises:
ValueError: If the damping parameter is negative.
"""
if damping < 0.0:
raise ValueError("Damping must be non-negative.")
if damping == 0.0 and self.inverse_covariance_set:
return self
if damping > 0.0:
identity = self.domain.identity_operator()
regularized_covariance = self.covariance + damping * identity
else:
regularized_covariance = self.covariance
if isinstance(solver, IterativeLinearSolver):
inverse_covariance = solver(
regularized_covariance, preconditioner=preconditioner
)
else:
inverse_covariance = solver(regularized_covariance)
if self.sample_set and damping > 0.0:
std_dev = np.sqrt(damping)
noise_measure = GaussianMeasure.from_standard_deviation(
self.domain, std_dev
)
def new_sample() -> Vector:
return self.domain.add(self.sample(), noise_measure.sample())
else:
new_sample = self._sample if self.sample_set else None
return GaussianMeasure(
covariance=regularized_covariance,
expectation=self._expectation,
sample=new_sample,
inverse_covariance=inverse_covariance,
)
[docs]
def with_sparse_approximation(
self,
/,
*,
threshold: float = 1e-3,
max_nnz: Optional[int] = None,
diag_rank: int = 0,
diag_samples: int = 0,
regularization_fraction: float = 1e-4,
parallel: bool = False,
n_jobs: int = -1,
) -> GaussianMeasure:
"""
Creates an approximately equivalent measure with a sparse covariance matrix
and an exactly factorized sparse inverse, built entirely matrix-free.
Args:
threshold: Minimum correlation required to keep an off-diagonal element.
max_nnz: Maximum number of non-zero elements allowed per column.
diag_rank: Rank of deterministic SVD used to estimate the diagonal.
diag_samples: Number of stochastic samples used to estimate the diagonal.
regularization_fraction: Tikhonov regularization applied before sparse inversion.
parallel: If True, computes the sparse approximations concurrently.
n_jobs: Number of CPU cores to use if parallel=True.
Returns:
A new GaussianMeasure backed by sparse operators.
"""
from .low_rank import deflated_diagonal
dim = self.domain.dim
if diag_rank == 0 and diag_samples == 0:
diag_vars = self.covariance.extract_diagonal(
galerkin=True, parallel=parallel, n_jobs=n_jobs
)
else:
diag_vars = deflated_diagonal(
self.covariance,
diag_rank,
diag_samples,
galerkin=True,
parallel=parallel,
n_jobs=n_jobs,
)
safe_diag = np.where(np.abs(diag_vars) < 1e-14, 1.0, np.abs(diag_vars))
scipy_op = self.covariance.matrix(galerkin=True)
def _process_column(j: int):
e_j = np.zeros(dim)
e_j[j] = 1.0
col_vals = scipy_op @ e_j
correlations = np.abs(col_vals) / np.sqrt(safe_diag * safe_diag[j])
valid_indices = np.where(correlations >= threshold)[0]
if max_nnz is not None and len(valid_indices) > max_nnz:
k = max_nnz - 1
if k > 0:
corr_masked = correlations.copy()
corr_masked[j] = -1.0
top_k_indices = np.argpartition(corr_masked, -k)[-k:]
rows = np.append(top_k_indices, j)
else:
rows = np.array([j])
else:
if j not in valid_indices:
rows = np.append(valid_indices, j)
else:
rows = valid_indices
vals = col_vals[rows]
return rows.tolist(), [j] * len(rows), vals.tolist()
if parallel:
results = Parallel(n_jobs=n_jobs)(
delayed(_process_column)(j) for j in range(dim)
)
else:
results = [_process_column(j) for j in range(dim)]
I_global, J_global, V_local = [], [], []
for rows, cols, vals in results:
I_global.extend(rows)
J_global.extend(cols)
V_local.extend(vals)
sparse_cov_matrix = sps.coo_matrix(
(V_local, (I_global, J_global)), shape=(dim, dim)
).tocsc()
sparse_cov_matrix = 0.5 * (sparse_cov_matrix + sparse_cov_matrix.T)
sparse_cov_op = SparseMatrixLinearOperator.from_matrix(
self.covariance.domain,
self.covariance.codomain,
sparse_cov_matrix,
galerkin=True,
)
try:
max_eig = spla.eigsh(
sparse_cov_matrix, k=1, which="LA", return_eigenvectors=False
)[0]
except spla.ArpackNoConvergence:
max_eig = sparse_cov_matrix.diagonal().max()
regularization_matrix = sps.eye(dim, format="csc") * (
max_eig * regularization_fraction
)
regularized_cov_matrix = sparse_cov_matrix + regularization_matrix
lu_factor = spla.splu(regularized_cov_matrix)
def apply_precision(x: Vector) -> Vector:
cx = self.covariance.codomain.to_dual(x)
cx_comp = self.covariance.codomain.dual.to_components(cx)
cy_comp = lu_factor.solve(cx_comp)
return self.covariance.domain.from_components(cy_comp)
sparse_inv_op = LinearOperator(
self.covariance.codomain,
self.covariance.domain,
apply_precision,
adjoint_mapping=apply_precision,
)
return GaussianMeasure(
covariance=sparse_cov_op,
inverse_covariance=sparse_inv_op,
expectation=self._expectation,
)
[docs]
def affine_mapping(
self,
/,
*,
operator: LinearOperator = None,
translation: Vector = None,
affine_operator: AffineOperator = None,
inverse_solver: LinearSolver = None,
inverse_preconditioner: LinearOperator = None,
) -> GaussianMeasure:
"""
Transforms the measure under an affine map `y = A(x) + b`.
This method calculates the push-forward measure. It can also construct
the implied inverse covariance (precision) using a saddle-point (KKT) system.
Args:
operator: The linear part of the mapping (A).
translation: The translation vector (b).
affine_operator: An AffineOperator instance (cannot be used with
`operator` or `translation`).
inverse_solver: A solver used to evaluate the KKT inverse covariance.
inverse_preconditioner: A preconditioner for the inverse_solver.
Returns:
A new GaussianMeasure representing the push-forward distribution.
Raises:
ValueError: If mutually exclusive arguments are provided, or if an
inverse solve is requested but the prior lacks an inverse covariance.
"""
if operator is None and affine_operator is None:
if translation is None:
return self
new_expectation = (
translation
if self.has_zero_expectation
else self.domain.add(self.expectation, translation)
)
if self.sample_set:
def new_sample() -> Vector:
return self.domain.add(self.sample(), translation)
else:
new_sample = None
return GaussianMeasure(
covariance=self._covariance,
covariance_factor=self._covariance_factor,
expectation=new_expectation,
sample=new_sample,
inverse_covariance=self._inverse_covariance,
inverse_covariance_factor=self._inverse_covariance_factor,
)
if affine_operator is not None:
if operator is not None or translation is not None:
raise ValueError(
"Cannot provide `affine_operator` alongside `operator` "
"or `translation`."
)
_operator = affine_operator.linear_part
_translation = affine_operator.translation_part
else:
_operator = (
operator if operator is not None else self.domain.identity_operator()
)
_translation = translation
if self.has_zero_expectation:
new_expectation = _translation
else:
mapped_exp = _operator(self.expectation)
if _translation is None:
new_expectation = mapped_exp
else:
new_expectation = _operator.codomain.add(mapped_exp, _translation)
if self.covariance_factor_set:
new_covariance_factor = _operator @ self.covariance_factor
new_covariance = None
else:
new_covariance_factor = None
new_covariance = _operator @ self.covariance @ _operator.adjoint
new_inverse_covariance = None
if inverse_solver is not None:
if not self.inverse_covariance_set:
raise ValueError(
"Cannot construct KKT inverse: the prior measure lacks an inverse covariance."
)
top_row = [self.inverse_covariance, _operator.adjoint]
bottom_row = [_operator, _operator.codomain.zero_operator()]
B = BlockLinearOperator([top_row, bottom_row])
if inverse_preconditioner is None and isinstance(
inverse_solver, IterativeLinearSolver
):
schur_op = (
new_covariance
if new_covariance is not None
else (new_covariance_factor @ new_covariance_factor.adjoint)
)
bottom_right_block = self._build_jacobi_schur_block(_operator, schur_op)
inverse_preconditioner = BlockDiagonalLinearOperator(
[self.covariance, bottom_right_block]
)
B_inv = inverse_solver(B, preconditioner=inverse_preconditioner)
proj = B_inv.codomain.subspace_projection(1)
incl = B_inv.domain.subspace_inclusion(1)
new_inverse_covariance = -1.0 * (proj @ B_inv @ incl)
if new_covariance_factor is not None:
return GaussianMeasure(
covariance_factor=new_covariance_factor,
expectation=new_expectation,
inverse_covariance=new_inverse_covariance,
)
else:
def new_sample() -> Vector:
mapped_sample = _operator(self.sample())
if _translation is None:
return mapped_sample
return _operator.codomain.add(mapped_sample, _translation)
return GaussianMeasure(
covariance=new_covariance,
expectation=new_expectation,
sample=new_sample if self.sample_set else None,
inverse_covariance=new_inverse_covariance,
)
[docs]
def as_multivariate_normal(
self, /, *, parallel: bool = False, n_jobs: int = -1
) -> multivariate_normal:
"""
Returns the measure as a `scipy.stats.multivariate_normal` object.
Args:
parallel: If True, evaluates the dense covariance matrix concurrently.
n_jobs: The number of CPU cores to use if parallel=True.
Returns:
A frozen scipy.stats.multivariate_normal object.
Raises:
NotImplementedError: If the measure is not defined on a EuclideanSpace.
"""
if not isinstance(self.domain, EuclideanSpace):
raise NotImplementedError(
"Method only defined for measures on Euclidean space."
)
mean_vector = self.expectation
cov_matrix = self.covariance.matrix(
dense=True, galerkin=True, parallel=parallel, n_jobs=n_jobs
)
try:
return multivariate_normal(
mean=mean_vector, cov=cov_matrix, allow_singular=True
)
except ValueError:
warnings.warn(
"Covariance matrix is not positive semi-definite due to "
"numerical errors. Setting negative eigenvalues to zero.",
UserWarning,
)
eigenvalues, eigenvectors = eigh(cov_matrix)
eigenvalues[eigenvalues < 0] = 0
cleaned_cov = eigenvectors @ diags(eigenvalues) @ eigenvectors.T
return multivariate_normal(
mean=mean_vector, cov=cleaned_cov, allow_singular=True
)
[docs]
def low_rank_approximation(
self,
size_estimate: int,
/,
*,
method: str = "variable",
max_rank: int = None,
power: int = 2,
rtol: float = 1e-4,
block_size: int = 10,
parallel: bool = False,
n_jobs: int = -1,
) -> GaussianMeasure:
"""
Constructs a low-rank approximation of the measure.
Uses randomized matrix-free algorithms to factorize the covariance.
Args:
size_estimate: Target rank or initial sample size for the algorithm.
method: 'variable' to sample dynamically, 'fixed' otherwise.
max_rank: Upper limit on rank for the 'variable' method.
power: Number of power iterations to enhance spectral decay.
rtol: Relative tolerance for the 'variable' method.
block_size: Samples drawn per iteration in the 'variable' method.
parallel: If True, parallelizes the evaluations.
n_jobs: Number of CPU cores to use if parallel=True.
Returns:
A new GaussianMeasure backed by a LowRankCholesky covariance factor.
"""
from .low_rank import LowRankCholesky
lr_cholesky = LowRankCholesky.from_randomized(
self.covariance,
size_estimate,
measure=None,
method=method,
max_rank=max_rank,
power=power,
rtol=rtol,
block_size=block_size,
parallel=parallel,
n_jobs=n_jobs,
)
return GaussianMeasure(
covariance_factor=lr_cholesky.l_factor,
expectation=self._expectation,
)
[docs]
def two_point_covariance(self, point: Any, /) -> Vector:
"""
Computes the two-point covariance function radiating from a specific point.
Args:
point: The spatial coordinate to evaluate from.
Returns:
The covariance field evaluated at the chosen point.
Raises:
NotImplementedError: If the domain lacks a `dirac_representation` method.
"""
if not hasattr(self.domain, "dirac_representation"):
raise NotImplementedError(
"Point evaluation is not defined for this measure's domain."
)
u = self.domain.dirac_representation(point)
cov = self.covariance
return cov(u)
[docs]
def directional_statistics(self, direction: Vector, /) -> Tuple[float, float]:
"""
Returns the expectation and variance of the scalar Gaussian <x, direction>.
Args:
direction: The test vector.
Returns:
A tuple containing (expectation, variance).
"""
expectation = (
0.0
if self.has_zero_expectation
else self.domain.inner_product(self.expectation, direction)
)
variance = self.domain.inner_product(self.covariance(direction), direction)
return expectation, variance
[docs]
def directional_covariance(self, d1: Vector, d2: Vector, /) -> float:
"""
Returns the covariance between the scalar projections <x, d1> and <x, d2>.
Args:
d1: The first test vector.
d2: The second test vector.
Returns:
The covariance scalar.
"""
return self.domain.inner_product(self.covariance(d1), d2)
[docs]
def directional_variance(self, d: Vector, /) -> float:
"""
Returns the variance of the scalar projection <x, d>.
Args:
d: The test vector.
Returns:
The variance scalar.
"""
return self.directional_covariance(d, d)
[docs]
def zero_expectation(self) -> GaussianMeasure:
"""
Returns a new measure with the same covariance, but zero expectation.
Returns:
A mean-shifted GaussianMeasure.
"""
if self.covariance_factor_set:
return GaussianMeasure(
covariance_factor=self.covariance_factor,
expectation=None,
)
if self.sample_set:
if self.has_zero_expectation:
new_sample = self.sample
else:
exp = self.expectation
def new_sample():
return self.domain.subtract(self.sample(), exp)
else:
new_sample = None
return GaussianMeasure(
covariance=self.covariance,
expectation=None,
sample=new_sample,
)
[docs]
def rescale_directional_variance(
self, direction: Vector, std: float, /
) -> GaussianMeasure:
"""
Returns a new measure where Var[<x, direction>] is scaled to std^2.
Args:
direction: The test vector to scale against.
std: The target standard deviation for the projection.
Returns:
A variance-scaled GaussianMeasure.
Raises:
ValueError: If the current directional variance is zero or negative.
"""
current_var = self.directional_variance(direction)
if current_var <= 0:
raise ValueError("Directional variance must be positive to rescale.")
norm = std / np.sqrt(current_var)
shifted_measure = self.zero_expectation()
scaled_measure = norm * shifted_measure
return scaled_measure.affine_mapping(translation=self._expectation)
[docs]
def kl_divergence(
self,
other: GaussianMeasure,
/,
*,
method: Literal["dense", "randomized"] = "dense",
hutchinson_size_estimate: int = 10,
hutchinson_method: Literal["variable", "fixed"] = "variable",
max_samples: Optional[int] = None,
rtol: float = 1e-2,
block_size: int = 5,
lanczos_size_estimate: int = 40,
lanczos_method: Literal["variable", "fixed"] = "variable",
lanczos_max_k: Optional[int] = None,
lanczos_rtol: float = 1e-3,
lanczos_atol: float = 1e-8,
lanczos_check_interval: int = 5,
parallel: bool = False,
n_jobs: int = -1,
) -> float:
"""
Computes the exact or approximate Kullback-Leibler (KL) divergence D_KL(self || other).
This calculates the divergence of 'self' (P) from the prior/reference
measure 'other' (Q).
Args:
other: The reference GaussianMeasure (Q).
method: 'dense' uses exact dense matrix factorizations (O(N^3)).
'randomized' uses matrix-free Stochastic Lanczos Quadrature (SLQ).
hutchinson_size_estimate: Initial samples for the randomized trace estimator.
hutchinson_method: 'variable' to sample until `rtol` is met, 'fixed' otherwise.
max_samples: Hard limit on Hutchinson samples.
rtol: Relative tolerance for the Hutchinson estimator.
block_size: Samples added per check in the 'variable' Hutchinson method.
lanczos_size_estimate: Initial Krylov dimension for fractional evaluations.
lanczos_method: 'variable' or 'fixed' convergence for Lanczos.
lanczos_max_k: Maximum Krylov dimension if 'variable' is used.
lanczos_rtol: Relative tolerance for Lanczos convergence.
lanczos_atol: Absolute tolerance for Lanczos convergence.
lanczos_check_interval: Iterations between Lanczos convergence checks.
parallel: If True, evaluates the stochastic probes concurrently.
n_jobs: Number of CPU cores to use if parallel=True.
Returns:
The calculated KL divergence.
Raises:
ValueError: If the measures reside on different domains, or if the 'randomized'
method is called without an inverse covariance on the reference measure.
"""
if self.domain != other.domain:
raise ValueError("Measures must be defined on the same domain.")
if method == "dense":
return self._kl_divergence_dense(other)
elif method == "randomized":
return self._kl_divergence_randomized(
other,
hutchinson_size_estimate=hutchinson_size_estimate,
hutchinson_method=hutchinson_method,
max_samples=max_samples,
rtol=rtol,
block_size=block_size,
lanczos_size_estimate=lanczos_size_estimate,
lanczos_method=lanczos_method,
lanczos_max_k=lanczos_max_k,
lanczos_rtol=lanczos_rtol,
lanczos_atol=lanczos_atol,
lanczos_check_interval=lanczos_check_interval,
parallel=parallel,
n_jobs=n_jobs,
)
else:
raise ValueError("method must be 'dense' or 'randomized'.")
# ---------------------------------------- #
# Special methods #
# ---------------------------------------- #
def __neg__(self) -> GaussianMeasure:
"""Returns a measure with a negated expectation."""
new_expectation = (
None
if self.has_zero_expectation
else self.domain.negative(self.expectation)
)
if self.covariance_factor_set:
return GaussianMeasure(
covariance_factor=self.covariance_factor,
expectation=new_expectation,
)
else:
new_sample = (
(lambda: self.domain.negative(self.sample()))
if self.sample_set
else None
)
return GaussianMeasure(
covariance=self.covariance,
expectation=new_expectation,
sample=new_sample,
)
def __mul__(self, alpha: float) -> GaussianMeasure:
"""Scales the measure by a scalar alpha."""
new_expectation = (
None
if self.has_zero_expectation
else self.domain.multiply(alpha, self.expectation)
)
if self.covariance_factor_set:
return GaussianMeasure(
covariance_factor=alpha * self.covariance_factor,
expectation=new_expectation,
)
new_sample = (
(lambda: self.domain.multiply(alpha, self.sample()))
if self.sample_set
else None
)
return GaussianMeasure(
covariance=alpha**2 * self.covariance,
expectation=new_expectation,
sample=new_sample,
)
def __rmul__(self, alpha: float) -> GaussianMeasure:
"""Scales the measure by a scalar alpha."""
return self * alpha
def __truediv__(self, a: float) -> GaussianMeasure:
"""Returns the division of the measure by a scalar."""
return self * (1.0 / a)
def __add__(self, other: GaussianMeasure) -> GaussianMeasure:
"""Adds two independent Gaussian measures defined on the same domain."""
if self.domain != other.domain:
raise ValueError("Measures must be defined on the same domain.")
if self.has_zero_expectation and other.has_zero_expectation:
new_expectation = None
elif self.has_zero_expectation:
new_expectation = other.expectation
elif other.has_zero_expectation:
new_expectation = self.expectation
else:
new_expectation = self.domain.add(self.expectation, other.expectation)
new_sample = (
(lambda: self.domain.add(self.sample(), other.sample()))
if self.sample_set and other.sample_set
else None
)
return GaussianMeasure(
covariance=self.covariance + other.covariance,
expectation=new_expectation,
sample=new_sample,
)
def __sub__(self, other: GaussianMeasure) -> GaussianMeasure:
"""Subtracts two independent Gaussian measures on the same domain."""
if self.domain != other.domain:
raise ValueError("Measures must be defined on the same domain.")
if self.has_zero_expectation and other.has_zero_expectation:
new_expectation = None
elif self.has_zero_expectation:
new_expectation = self.domain.negative(other.expectation)
elif other.has_zero_expectation:
new_expectation = self.expectation
else:
new_expectation = self.domain.subtract(self.expectation, other.expectation)
new_sample = (
(lambda: self.domain.subtract(self.sample(), other.sample()))
if self.sample_set and other.sample_set
else None
)
return GaussianMeasure(
covariance=self.covariance + other.covariance,
expectation=new_expectation,
sample=new_sample,
)
# ---------------------------------------- #
# Private methods #
# ---------------------------------------- #
def _sampling_radius(
self,
probability: float,
gauge_squared,
*,
n_samples: int = 10_000,
parallel: bool = False,
n_jobs: int = -1,
) -> float:
"""Empirical p-quantile of R(X)^2 over Monte Carlo draws."""
if not 0.0 < probability < 1.0:
raise ValueError("probability must lie strictly between 0 and 1.")
if not self.sample_set:
raise ValueError(
"Sampling-based radius requires a measure equipped with a "
"sample method."
)
samples = self.samples(n_samples, parallel=parallel, n_jobs=n_jobs)
mean = self.expectation
gauge_sq_values = np.empty(n_samples, dtype=float)
for i, x in enumerate(samples):
diff = self.domain.subtract(x, mean)
gauge_sq_values[i] = float(gauge_squared(diff))
return float(np.quantile(gauge_sq_values, probability))
def _sample_from_factor(self) -> Vector:
"""Default sampling method when a covariance factor is provided."""
covariance_factor = self.covariance_factor
w = covariance_factor.domain.random()
value = covariance_factor(w)
if self.has_zero_expectation:
return value
return self.domain.add(value, self.expectation)
def _build_jacobi_schur_block(
self, operator: LinearOperator, schur_op: LinearOperator
) -> LinearOperator:
"""Builds the inverse Jacobi (diagonal) approximation of the Schur complement."""
try:
n_data = operator.codomain.dim
if n_data <= 200:
schur_diag = schur_op.extract_diagonal(galerkin=True)
else:
from .low_rank import deflated_diagonal
schur_diag = deflated_diagonal(
schur_op, 10, 40, method="fixed", galerkin=True
)
max_var = np.max(schur_diag)
safe_diag = np.maximum(schur_diag, max_var * 1e-12)
inv_safe_diag = 1.0 / safe_diag
from .linear_operators import DiagonalSparseMatrixLinearOperator
return DiagonalSparseMatrixLinearOperator.from_diagonal_values(
operator.codomain, operator.codomain, inv_safe_diag, galerkin=True
)
except Exception:
return operator.codomain.identity_operator()
def _credible_set_chi2_ellipsoid(
self,
probability: float,
*,
rank: Optional[int],
open_set: bool,
):
"""Builds the classical Mahalanobis ellipsoid."""
effective_rank = self._resolve_rank(rank)
radius = float(np.sqrt(chi2.ppf(probability, df=effective_rank)))
from .subsets import Ellipsoid
return Ellipsoid(
self.domain,
self.expectation,
radius,
self.inverse_covariance,
open_set=open_set,
inverse_operator=self.covariance,
)
def _credible_set_chi2_cameron_martin(
self,
probability: float,
*,
rank: Optional[int],
open_set: bool,
):
"""Builds the unit ball in Cameron-Martin geometry."""
effective_rank = self._resolve_rank(rank)
radius = float(np.sqrt(chi2.ppf(probability, df=effective_rank)))
from .subsets import Ball
induced_domain_type = (
MassWeightedHilbertModule
if isinstance(self.domain, HilbertModule)
else MassWeightedHilbertSpace
)
induced_domain = induced_domain_type(
self.domain, self.inverse_covariance, self.covariance
)
return Ball(induced_domain, self.expectation, radius, open_set=open_set)
def _resolve_rank(self, rank: Optional[int]) -> int:
"""Resolves the chi-square degrees of freedom."""
if rank is None and self.domain.dim < 1:
raise ValueError(
"Rank must be provided for domains without a positive "
"finite dimension, such as basis-free function spaces."
)
effective_rank = self.domain.dim if rank is None else rank
if not isinstance(effective_rank, int) or effective_rank < 1:
raise ValueError("Rank must be a positive integer.")
return effective_rank
def _resolve_spectrum(
self,
spectrum,
spectrum_size: Optional[int],
spectrum_low_rank_kwargs: Optional[dict],
rng: Optional[np.random.Generator],
):
"""Resolves the covariance spectrum input into eigenvalues and objects."""
from .low_rank import LowRankEig
if spectrum is None:
if spectrum_size is None or spectrum_size < 1:
raise ValueError(
"spectrum_size (>=1) is required when spectrum is None."
)
kwargs = dict(spectrum_low_rank_kwargs or {})
eig = LowRankEig.from_randomized(
self.covariance,
spectrum_size,
measure=self,
**kwargs,
)
return np.asarray(eig.eigenvalues, dtype=float), eig
if isinstance(spectrum, LowRankEig):
return np.asarray(spectrum.eigenvalues, dtype=float), spectrum
if callable(spectrum):
if spectrum_size is None or spectrum_size < 1:
raise ValueError(
"spectrum_size (>=1) is required when spectrum is a callable."
)
eigvals = np.asarray(spectrum(spectrum_size), dtype=float).ravel()
return eigvals, None
eigvals = np.asarray(spectrum, dtype=float).ravel()
if eigvals.ndim != 1:
raise ValueError(f"spectrum array must be 1-D; got shape {eigvals.shape}.")
return eigvals, None
def _resolve_radius_method(self, radius_method: str, has_spectrum: bool) -> str:
"""Resolves the algorithm used to compute the credible radius."""
if radius_method == "spectral":
return "spectral"
if radius_method == "sampling":
if not self.sample_set:
raise ValueError(
"radius_method='sampling' requires a sampling-equipped "
"Gaussian measure."
)
return "sampling"
if radius_method == "auto":
if has_spectrum:
return "spectral"
if self.sample_set:
return "sampling"
raise ValueError(
"Cannot resolve a radius for this measure: pass a spectrum "
"or set radius_method='sampling' on a sampling-equipped measure."
)
raise ValueError("radius_method must be 'auto', 'spectral', or 'sampling'.")
def _credible_set_ambient_ball(
self,
probability: float,
*,
open_set: bool,
spectrum,
spectrum_size: Optional[int],
radius_method: str,
quantile_method: str,
quantile_tol: float,
n_samples: int,
spectrum_low_rank_kwargs: Optional[dict],
rng: Optional[np.random.Generator],
):
"""Builds the ambient norm ball using the specified radius method."""
from .subsets import Ball
from .quadratic_form_quantile import weighted_chi2_quantile
method = self._resolve_radius_method(
radius_method, has_spectrum=(spectrum is not None)
)
if method == "spectral":
eigvals, _ = self._resolve_spectrum(
spectrum, spectrum_size, spectrum_low_rank_kwargs, rng
)
r_p_sq = weighted_chi2_quantile(
eigvals, probability, method=quantile_method, tol=quantile_tol
)
else:
def gauge_squared(d):
return float(self.domain.squared_norm(d))
r_p_sq = self._sampling_radius(
probability, gauge_squared, n_samples=n_samples
)
r_p = float(np.sqrt(max(r_p_sq, 0.0)))
return Ball(self.domain, self.expectation, r_p, open_set=open_set)
def _credible_set_weakened_ellipsoid(
self,
probability: float,
*,
theta: float,
open_set: bool,
spectrum,
spectrum_size: Optional[int],
radius_method: str,
quantile_method: str,
quantile_tol: float,
fractional_apply: str,
n_samples: int,
lanczos_size_estimate: int,
lanczos_method: str,
lanczos_max_k: Optional[int],
lanczos_rtol: float,
lanczos_atol: float,
lanczos_check_interval: int,
spectrum_low_rank_kwargs: Optional[dict],
rng: Optional[np.random.Generator],
):
"""Builds the weakened-covariance ellipsoid."""
from .low_rank import LowRankEig
from .quadratic_form_quantile import weighted_chi2_quantile
from .subsets import Ellipsoid
method = self._resolve_radius_method(
radius_method, has_spectrum=(spectrum is not None)
)
if fractional_apply not in ("auto", "lanczos", "low_rank_eig"):
raise ValueError(
"fractional_apply must be 'auto', 'lanczos', or 'low_rank_eig'."
)
if fractional_apply == "auto":
if isinstance(spectrum, LowRankEig) or spectrum is None:
backend = "low_rank_eig"
else:
backend = "lanczos"
else:
backend = fractional_apply
eigvals: Optional[np.ndarray] = None
eig_obj: Optional[LowRankEig] = None
if method == "spectral" or backend == "low_rank_eig":
eigvals, eig_obj = self._resolve_spectrum(
spectrum, spectrum_size, spectrum_low_rank_kwargs, rng
)
A, A_inv, A_inv_sqrt = self._build_fractional_operators(
theta=theta,
backend=backend,
eig_obj=eig_obj,
lanczos_size_estimate=lanczos_size_estimate,
lanczos_method=lanczos_method,
lanczos_max_k=lanczos_max_k,
lanczos_rtol=lanczos_rtol,
lanczos_atol=lanczos_atol,
lanczos_check_interval=lanczos_check_interval,
)
if method == "spectral":
assert eigvals is not None
weights = np.power(eigvals, 1.0 - theta)
self._maybe_warn_trace_borderline(weights, theta, eigvals.size)
r_p_sq = weighted_chi2_quantile(
weights, probability, method=quantile_method, tol=quantile_tol
)
else:
def gauge_squared(d):
return float(self.domain.inner_product(A(d), d))
r_p_sq = self._sampling_radius(
probability, gauge_squared, n_samples=n_samples
)
r_p = float(np.sqrt(max(r_p_sq, 0.0)))
return Ellipsoid(
self.domain,
self.expectation,
r_p,
A,
open_set=open_set,
inverse_operator=A_inv,
inverse_sqrt_operator=A_inv_sqrt,
)
def _build_fractional_operators(
self,
*,
theta: float,
backend: str,
eig_obj,
lanczos_size_estimate: int,
lanczos_method: str,
lanczos_max_k: Optional[int],
lanczos_rtol: float,
lanczos_atol: float,
lanczos_check_interval: int,
) -> Tuple[LinearOperator, LinearOperator, LinearOperator]:
"""Builds the fractional metric operators required by the weakened ellipsoid."""
if backend == "low_rank_eig":
if eig_obj is None:
raise ValueError(
"fractional_apply='low_rank_eig' requires a "
"LowRankEig spectrum or spectrum=None (so it can be "
"computed); got an array/callable. Pass "
"fractional_apply='lanczos' instead."
)
from .spectral_operator import fractional_operators_from_eig
return fractional_operators_from_eig(eig_obj, theta)
from .functional_calculus import LanczosOperatorFunction
cov = self.covariance
def make(power: float) -> LinearOperator:
return LanczosOperatorFunction(
cov,
lambda x: np.power(x, power),
size_estimate=lanczos_size_estimate,
method=lanczos_method,
max_k=lanczos_max_k,
rtol=lanczos_rtol,
atol=lanczos_atol,
check_interval=lanczos_check_interval,
)
return make(-theta), make(theta), make(0.5 * theta)
def _maybe_warn_trace_borderline(
self,
weights: np.ndarray,
theta: float,
n: int,
) -> None:
"""Warns when the (1-theta)-trace condition is numerically borderline."""
if n < 20:
return
tail_share = float(np.sum(weights[n // 2 :])) / max(
float(np.sum(weights)), 1e-300
)
if tail_share > 0.3:
warnings.warn(
f"The (1-theta) trace tail of the resolved spectrum is "
f"{tail_share:.2f} of the total at theta={theta:.3f}; the "
"trace-class condition may be numerically borderline. "
"Increase spectrum_size or reduce theta to improve "
"reliability.",
UserWarning,
stacklevel=3,
)
def _kl_divergence_dense(self, other: GaussianMeasure) -> float:
"""Original dense matrix implementation of the KL divergence."""
k = self.domain.dim
G_p = self.covariance.matrix(dense=True, galerkin=True)
G_q = other.covariance.matrix(dense=True, galerkin=True)
try:
L_q = cholesky(G_q, lower=True)
except np.linalg.LinAlgError:
raise ValueError(
"Covariance matrix of the 'other' measure is not positive definite."
)
X = cho_solve((L_q, True), G_p)
trace_term = np.trace(X)
log_det_q = 2.0 * np.sum(np.log(np.diag(L_q)))
sign_p, log_det_p = np.linalg.slogdet(G_p)
if sign_p <= 0:
raise ValueError("Covariance matrix of 'self' is not positive definite.")
log_det_term = log_det_q - log_det_p
if self.has_zero_expectation and other.has_zero_expectation:
mahalanobis_term = 0.0
else:
diff = self.domain.subtract(self.expectation, other.expectation)
diff_dual = self.domain.to_dual(diff)
diff_c_prime = self.domain.dual.to_components(diff_dual)
y = cho_solve((L_q, True), diff_c_prime)
mahalanobis_term = np.dot(diff_c_prime, y)
return float(0.5 * (trace_term + mahalanobis_term - k + log_det_term))
def _kl_divergence_randomized(
self,
other: GaussianMeasure,
*,
hutchinson_size_estimate: int,
hutchinson_method: str,
max_samples: Optional[int],
rtol: float,
block_size: int,
lanczos_size_estimate: int,
lanczos_method: str,
lanczos_max_k: Optional[int],
lanczos_rtol: float,
lanczos_atol: float,
lanczos_check_interval: int,
parallel: bool,
n_jobs: int,
) -> float:
"""Matrix-free SLQ / Hutchinson estimation of the KL divergence."""
if not other.inverse_covariance_set:
raise ValueError(
"Randomized KL divergence requires the 'other' measure to "
"have an inverse covariance operator set."
)
space = self.domain
k = space.dim
if self.has_zero_expectation and other.has_zero_expectation:
mahalanobis_term = 0.0
else:
diff = space.subtract(self.expectation, other.expectation)
q_inv_diff = other.inverse_covariance(diff)
mahalanobis_term = space.inner_product(diff, q_inv_diff)
from .functional_calculus import operator_function_quadratic_form
if hasattr(space, "squared_norms"):
inv_norms = 1.0 / np.sqrt(space.squared_norms)
else:
inv_norms = np.array(
[
1.0 / np.sqrt(space.squared_norm(space.basis_vector(i)))
for i in range(space.dim)
]
)
def log_func(x: np.ndarray) -> np.ndarray:
return np.log(np.maximum(x, 1e-15))
def _single_sample() -> float:
c_z = np.random.choice([-1.0, 1.0], size=space.dim) * inv_norms
z = space.from_components(c_z)
p_z = self.covariance(z)
q_inv_p_z = other.inverse_covariance(p_z)
trace_val = space.inner_product(z, q_inv_p_z)
log_q = operator_function_quadratic_form(
other.covariance,
z,
log_func,
size_estimate=lanczos_size_estimate,
method=lanczos_method,
max_k=lanczos_max_k,
rtol=lanczos_rtol,
atol=lanczos_atol,
check_interval=lanczos_check_interval,
)
log_p = operator_function_quadratic_form(
self.covariance,
z,
log_func,
size_estimate=lanczos_size_estimate,
method=lanczos_method,
max_k=lanczos_max_k,
rtol=lanczos_rtol,
atol=lanczos_atol,
check_interval=lanczos_check_interval,
)
return trace_val + log_q - log_p
if max_samples is None:
max_samples = space.dim
num_samples = min(hutchinson_size_estimate, max_samples)
def _compute_sample_sum(n_samples_to_draw: int) -> float:
if parallel:
results = Parallel(n_jobs=n_jobs)(
delayed(_single_sample)() for _ in range(n_samples_to_draw)
)
return sum(results)
else:
return sum(_single_sample() for _ in range(n_samples_to_draw))
total_sum = _compute_sample_sum(num_samples)
stochastic_term = total_sum / num_samples if num_samples > 0 else 0.0
if hutchinson_method == "variable" and num_samples < max_samples:
while num_samples < max_samples:
old_est = stochastic_term
samples_to_add = min(block_size, max_samples - num_samples)
total_sum += _compute_sample_sum(samples_to_add)
total_samples = num_samples + samples_to_add
stochastic_term = total_sum / total_samples
if abs(stochastic_term) > 0:
error = abs(stochastic_term - old_est) / abs(stochastic_term)
if error < rtol:
break
num_samples = total_samples
return float(0.5 * (stochastic_term + mahalanobis_term - k))