"""
Implements randomized matrix-free algorithms for operators on Hilbert spaces.
This module provides fully abstract, matrix-free implementations of randomized
factorizations (SVD, Cholesky, Eigendecomposition). These algorithms operate
entirely via operator composition, adjoint mappings, and the intrinsic geometry
of the underlying `HilbertSpace` objects.
If a `GaussianMeasure` is provided, the algorithms draw structured samples to
respect mass matrices and continuous function space geometries. If no measure
is provided, they fall back to highly optimized, component-based matrix-free algorithms.
"""
from __future__ import annotations
from typing import List, Optional, Union, Callable
import warnings
import numpy as np
import scipy.linalg
from scipy.linalg import qr
from scipy.sparse.linalg import LinearOperator as ScipyLinOp
from .hilbert_space import Vector, EuclideanSpace, HilbertSpace
from .linear_operators import LinearOperator, DiagonalSparseMatrixLinearOperator
from .gaussian_measure import GaussianMeasure
from .parallel import parallel_mat_mat
# A type for objects that act like matrices
MatrixLike = Union[np.ndarray, ScipyLinOp]
[docs]
def white_noise_measure(domain: HilbertSpace) -> GaussianMeasure:
"""
Creates a formal N(0, I) "white noise" measure on the given domain.
WARNING: Mathematically, the identity operator is not trace-class in
infinite dimensions, meaning this does not define a valid Radon measure
on the Hilbert space. It is a cylinder measure.
In this library, it is used strictly as a numerical tool to generate
isotropic test vectors for randomized algorithms, ensuring that the
sampling perfectly respects the domain's inner product (e.g., mass matrices)
without biasing the range approximation.
"""
return GaussianMeasure.from_standard_deviation(domain, 1.0)
[docs]
class LowRankSVD(LinearOperator):
"""
A LinearOperator representing the low-rank SVD: A ≈ U @ Sigma @ V*.
This class encapsulates the components of a Singular Value Decomposition,
allowing it to be used directly as a `LinearOperator` while providing
access to the individual low-rank factors.
"""
def __init__(
self,
u_op: LinearOperator,
sigma_op: DiagonalSparseMatrixLinearOperator,
v_op: LinearOperator,
):
"""
Initializes the LowRankSVD operator.
Args:
u_op (LinearOperator): The left singular vectors operator (isometry).
sigma_op (DiagonalSparseMatrixLinearOperator): The diagonal operator of singular values.
v_op (LinearOperator): The right singular vectors operator (isometry).
"""
self._u_op = u_op
self._sigma_op = sigma_op
self._v_op = v_op
self._rank = sigma_op.domain.dim
full_op = self.u_factor @ self.sigma_factor @ self.v_factor.adjoint
super().__init__(
full_op.domain,
full_op.codomain,
full_op,
adjoint_mapping=full_op.adjoint,
)
@property
def rank(self) -> int:
"""int: The rank of the approximation."""
return self._rank
@property
def u_factor(self) -> LinearOperator:
"""LinearOperator: The left singular vectors (U)."""
return self._u_op
@property
def sigma_factor(self) -> DiagonalSparseMatrixLinearOperator:
"""DiagonalSparseMatrixLinearOperator: The diagonal matrix of singular values (Sigma)."""
return self._sigma_op
@property
def v_factor(self) -> LinearOperator:
"""LinearOperator: The right singular vectors (V)."""
return self._v_op
@property
def singular_values(self) -> np.ndarray:
"""np.ndarray: A 1D array of the computed singular values."""
return self._sigma_op.extract_diagonal()
[docs]
@classmethod
def from_randomized(
cls,
operator: LinearOperator,
size_estimate: int,
*,
measure: Optional[GaussianMeasure] = None,
galerkin: bool = False,
method: str = "variable",
max_rank: Optional[int] = None,
power: int = 2,
rtol: float = 1e-4,
block_size: int = 10,
parallel: bool = False,
n_jobs: int = -1,
) -> LowRankSVD:
"""
Computes the SVD using a unified randomized range finder.
Args:
operator (LinearOperator): The operator to approximate.
size_estimate (int): For 'fixed' method, the exact target rank. For 'variable'
method, this is the initial rank to sample.
measure (GaussianMeasure, optional): A prior measure used to draw test vectors.
If provided, respects the domain's geometry. If None, falls back to a
component-based SciPy LinearOperator representation.
galerkin (bool): If True, computes the Galerkin representation when falling back to components.
method (str): {'variable', 'fixed'}. The rank-determination algorithm to use.
max_rank (int, optional): Hard limit on the rank for the 'variable' method.
power (int): Number of power iterations to improve accuracy.
rtol (float): Relative tolerance for the 'variable' method.
block_size (int): Number of new vectors to sample per iteration in 'variable' method.
parallel (bool): Whether to parallelize the matrix/operator evaluations.
n_jobs (int): Number of cores to use if parallel=True (-1 for all).
Returns:
LowRankSVD: An instantiated operator containing the U, Sigma, and V factors.
"""
Q_op = random_range(
operator,
size_estimate,
measure=measure,
galerkin=galerkin,
method=method,
max_rank=max_rank,
power=power,
rtol=rtol,
block_size=block_size,
parallel=parallel,
n_jobs=n_jobs,
)
B = Q_op.adjoint @ operator
M_op = B @ B.adjoint
M_dense = M_op.matrix(dense=True)
eigenvalues, U_tilde = scipy.linalg.eigh(M_dense)
idx = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[idx]
U_tilde = U_tilde[:, idx]
sigmas = np.sqrt(np.maximum(eigenvalues, 0.0))
k = len(sigmas)
euclidean_k = EuclideanSpace(k)
U_tilde_op = LinearOperator.from_matrix(euclidean_k, euclidean_k, U_tilde)
u_op = Q_op @ U_tilde_op
sigma_op = DiagonalSparseMatrixLinearOperator.from_diagonal_values(
euclidean_k, euclidean_k, sigmas
)
inv_sigmas = np.zeros_like(sigmas)
valid = sigmas > 1e-12
inv_sigmas[valid] = 1.0 / sigmas[valid]
sigma_inv_op = DiagonalSparseMatrixLinearOperator.from_diagonal_values(
euclidean_k, euclidean_k, inv_sigmas
)
v_op = B.adjoint @ U_tilde_op @ sigma_inv_op
return cls(u_op, sigma_op, v_op)
[docs]
class LowRankEig(LinearOperator):
"""
A LinearOperator representing the eigendecomposition: A ≈ U @ D @ U*.
This class encapsulates the components of an Eigendecomposition for a
self-adjoint operator, allowing it to act as a `LinearOperator` while
exposing the eigenvectors and eigenvalues.
"""
def __init__(
self,
u_op: LinearOperator,
d_op: DiagonalSparseMatrixLinearOperator,
):
"""
Initializes the LowRankEig operator.
Args:
u_op (LinearOperator): The eigenvectors operator (U).
d_op (DiagonalSparseMatrixLinearOperator): The diagonal operator of eigenvalues (D).
"""
self._u_op = u_op
self._d_op = d_op
self._rank = d_op.domain.dim
full_op = self.u_factor @ self.d_factor @ self.u_factor.adjoint
super().__init__(
full_op.domain,
full_op.codomain,
full_op,
adjoint_mapping=full_op.adjoint,
)
@property
def rank(self) -> int:
"""int: The rank of the approximation."""
return self._rank
@property
def u_factor(self) -> LinearOperator:
"""LinearOperator: The eigenvectors (U)."""
return self._u_op
@property
def d_factor(self) -> DiagonalSparseMatrixLinearOperator:
"""DiagonalSparseMatrixLinearOperator: The diagonal matrix of eigenvalues (D)."""
return self._d_op
@property
def eigenvalues(self) -> np.ndarray:
"""np.ndarray: A 1D array of the computed eigenvalues."""
return self.d_factor.extract_diagonal()
[docs]
@classmethod
def from_randomized(
cls,
operator: LinearOperator,
size_estimate: int,
*,
measure: Optional[GaussianMeasure] = None,
galerkin: bool = True,
method: str = "variable",
max_rank: Optional[int] = None,
power: int = 2,
rtol: float = 1e-4,
block_size: int = 10,
parallel: bool = False,
n_jobs: int = -1,
) -> LowRankEig:
"""
Computes the Eigendecomposition using a unified randomized range finder.
Args:
operator (LinearOperator): The self-adjoint operator to approximate.
size_estimate (int): Target rank or initial block size.
measure (GaussianMeasure, optional): Prior measure for drawing test vectors.
galerkin (bool): Default True for Eig. Computes Galerkin representation on fallback.
method (str): {'variable', 'fixed'}.
max_rank (int, optional): Upper limit on rank for 'variable' method.
power (int): Number of power iterations.
rtol (float): Relative tolerance for 'variable' method.
block_size (int): Samples per iteration.
parallel (bool): Parallelize the sampling/multiplication.
n_jobs (int): CPU cores to utilize.
Returns:
LowRankEig: An instantiated operator containing the U and D factors.
Raises:
ValueError: If the operator is not an automorphism (domain != codomain).
"""
if operator.domain != operator.codomain:
raise ValueError("Eigendecomposition requires an automorphism.")
Q_op = random_range(
operator,
size_estimate,
measure=measure,
galerkin=galerkin,
method=method,
max_rank=max_rank,
power=power,
rtol=rtol,
block_size=block_size,
parallel=parallel,
n_jobs=n_jobs,
)
B = Q_op.adjoint @ operator @ Q_op
B_dense = B.matrix(dense=True)
eigenvalues, U_tilde = scipy.linalg.eigh(B_dense)
idx = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[idx]
U_tilde = U_tilde[:, idx]
k = len(eigenvalues)
euclidean_k = EuclideanSpace(k)
U_tilde_op = LinearOperator.from_matrix(euclidean_k, euclidean_k, U_tilde)
u_op = Q_op @ U_tilde_op
d_op = DiagonalSparseMatrixLinearOperator.from_diagonal_values(
euclidean_k, euclidean_k, eigenvalues
)
return cls(u_op, d_op)
[docs]
def apply_function(
self, func: Callable[[np.ndarray], np.ndarray], *, regularization: float = 0.0
) -> LowRankEig:
"""
Applies a function to the spectrum of the operator.
Returns a new LowRankEig representing f(A).
"""
def safe_func(x: np.ndarray) -> np.ndarray:
return func(x + regularization)
new_d_op = self.d_factor.apply_function(safe_func)
# Return a new LowRankEig with the updated diagonal
return LowRankEig(self.u_factor, new_d_op)
[docs]
class LowRankCholesky(LinearOperator):
"""
A LinearOperator representing the Cholesky-like factorization: A ≈ L @ L*.
This class provides a memory-efficient low-rank Cholesky decomposition
of a positive semi-definite operator, highly useful for drawing samples
from Gaussian measures.
"""
def __init__(self, l_op: LinearOperator):
"""
Initializes the LowRankCholesky operator.
Args:
l_op (LinearOperator): The Cholesky factor (L) such that A ≈ L @ L*.
"""
self._l_op = l_op
self._rank = l_op.domain.dim
full_op = self.l_factor @ self.l_factor.adjoint
super().__init__(
full_op.domain,
full_op.codomain,
full_op,
adjoint_mapping=full_op.adjoint,
)
@property
def rank(self) -> int:
"""int: The rank of the approximation."""
return self._rank
@property
def l_factor(self) -> LinearOperator:
"""LinearOperator: The Cholesky factor (L)."""
return self._l_op
[docs]
@classmethod
def from_randomized(
cls,
operator: LinearOperator,
size_estimate: int,
*,
measure: Optional[GaussianMeasure] = None,
galerkin: bool = True,
method: str = "variable",
max_rank: Optional[int] = None,
power: int = 2,
rtol: float = 1e-4,
block_size: int = 10,
parallel: bool = False,
n_jobs: int = -1,
) -> LowRankCholesky:
"""
Computes a robust approximate Cholesky factorization via randomized range finding.
Attempts a direct dense Cholesky factorization on the projected core matrix.
If it fails (due to numerical precision issues), it safely falls back to
an Eigendecomposition-based square root.
Args:
operator (LinearOperator): Positive semi-definite operator to factorize.
size_estimate (int): Target rank or initial block size.
measure (GaussianMeasure, optional): Prior measure for drawing test vectors.
galerkin (bool): Default True. Computes Galerkin representation on fallback.
method (str): {'variable', 'fixed'}.
max_rank (int, optional): Upper limit on rank for 'variable' method.
power (int): Number of power iterations.
rtol (float): Relative tolerance for 'variable' method.
block_size (int): Samples per iteration.
parallel (bool): Parallelize the sampling/multiplication.
n_jobs (int): CPU cores to utilize.
Returns:
LowRankCholesky: An instantiated operator containing the L factor.
Raises:
ValueError: If the operator is not an automorphism.
"""
if operator.domain != operator.codomain:
raise ValueError("Cholesky requires an automorphism.")
Q_op = random_range(
operator,
size_estimate,
measure=measure,
galerkin=galerkin,
method=method,
max_rank=max_rank,
power=power,
rtol=rtol,
block_size=block_size,
parallel=parallel,
n_jobs=n_jobs,
)
B = Q_op.adjoint @ operator @ Q_op
B_dense = B.matrix(dense=True)
k = B_dense.shape[0]
try:
L = scipy.linalg.cholesky(B_dense, lower=True)
L_inv = scipy.linalg.solve_triangular(L, np.eye(k), lower=True)
transform_matrix = L_inv.T
except scipy.linalg.LinAlgError:
eigenvalues, U_tilde = scipy.linalg.eigh(B_dense)
max_eig = eigenvalues[-1] if len(eigenvalues) > 0 else 0
threshold = 1e-12 * max_eig if max_eig > 0 else 0
safe_eigs = np.where(eigenvalues >= threshold, eigenvalues, 0.0)
inv_sqrt_eigs = np.where(safe_eigs > 0, 1.0 / np.sqrt(safe_eigs), 0.0)
transform_matrix = U_tilde @ np.diag(inv_sqrt_eigs)
euclidean_k = EuclideanSpace(k)
transform_op = LinearOperator.from_matrix(
euclidean_k, euclidean_k, transform_matrix
)
l_op = operator @ Q_op @ transform_op
return cls(l_op)
# =====================================================================
# Unified Range Finder
# =====================================================================
[docs]
def random_range(
operator: LinearOperator,
size_estimate: int,
/,
*,
measure: Optional[GaussianMeasure] = None,
galerkin: bool = False,
method: str = "variable",
max_rank: Optional[int] = None,
power: int = 2,
rtol: float = 1e-4,
block_size: int = 10,
parallel: bool = False,
n_jobs: int = -1,
) -> LinearOperator:
"""
Unified random range finder acting as an architectural bridge.
If a `GaussianMeasure` is provided, it draws abstract structured samples
to respect Hilbert space geometries and mass matrices. If no measure is
provided, it routes to high-performance component-based representations
via SciPy `LinearOperator`s.
Args:
operator (LinearOperator): The linear operator whose range is to be approximated.
size_estimate (int): Target rank ('fixed') or initial sample size ('variable').
measure (GaussianMeasure, optional): Measure to draw test samples from.
galerkin (bool): If True, uses the Galerkin representation for the component fallback.
method (str): {'variable', 'fixed'}. Algorithm choice.
max_rank (int, optional): Hard limit on rank for variable sampling.
power (int): Number of power iterations to enhance singular value decay.
rtol (float): Relative tolerance for convergence checking.
block_size (int): Size of new sample batches.
parallel (bool): Parallelize computations where applicable.
n_jobs (int): CPU cores to use.
Returns:
LinearOperator: The isometry Q mapping from Euclidean(k) into the codomain.
"""
# --- PATH A: Abstract, Measure-Based (Matrix-Free) ---
if measure is not None:
if method == "variable":
q_basis = _abstract_variable_rank_random_range(
operator,
measure,
size_estimate,
max_rank=max_rank,
power=power,
block_size=block_size,
rtol=rtol,
parallel=parallel,
n_jobs=n_jobs,
)
elif method == "fixed":
if any([rtol != 1e-4, block_size != 10, max_rank is not None]):
warnings.warn(
"'rtol', 'block_size', and 'max_rank' are ignored when method='fixed'.",
UserWarning,
)
q_basis = _abstract_fixed_rank_random_range(
operator,
measure,
size_estimate,
power=power,
parallel=parallel,
n_jobs=n_jobs,
)
else:
raise ValueError(
f"Unknown method '{method}'. Choose from 'fixed' or 'variable'."
)
return LinearOperator.from_vectors(operator.codomain, q_basis).adjoint
# --- PATH B: Component-Based Fallback ---
else:
matrix_repr = operator.matrix(
dense=False, galerkin=galerkin, parallel=parallel, n_jobs=n_jobs
)
if method == "variable":
q_matrix = _component_variable_rank_random_range(
matrix_repr,
size_estimate,
max_rank=max_rank,
power=power,
block_size=block_size,
rtol=rtol,
parallel=parallel,
n_jobs=n_jobs,
)
elif method == "fixed":
if any([rtol != 1e-4, block_size != 10, max_rank is not None]):
warnings.warn(
"'rtol', 'block_size', and 'max_rank' are ignored when method='fixed'.",
UserWarning,
)
q_matrix = _component_fixed_rank_random_range(
matrix_repr,
size_estimate,
power=power,
parallel=parallel,
n_jobs=n_jobs,
)
else:
raise ValueError(
f"Unknown method '{method}'. Choose from 'fixed' or 'variable'."
)
k = q_matrix.shape[1]
euclidean_k = EuclideanSpace(k)
return LinearOperator.from_matrix(
euclidean_k, operator.codomain, q_matrix, galerkin=False
)
# =====================================================================
# Internal Abstract (Measure-Based) Routines
# =====================================================================
def _abstract_fixed_rank_random_range(
operator: LinearOperator,
measure: GaussianMeasure,
rank: int,
/,
*,
power: int = 0,
parallel: bool = False,
n_jobs: int = -1,
) -> List[Vector]:
"""
Internal. Computes an abstract matrix-free fixed-rank range approximation.
Draws exactly `rank` samples from the measure and applies Modified Gram-Schmidt.
"""
if operator.domain != measure.domain:
raise ValueError("The measure must be defined on the operator's domain.")
omega_samples = measure.samples(rank, parallel=parallel, n_jobs=n_jobs)
y_vectors = [operator(w) for w in omega_samples]
q_basis = operator.codomain.gram_schmidt(y_vectors)
for _ in range(power):
z_vectors = [operator.adjoint(q) for q in q_basis]
z_basis = operator.domain.gram_schmidt(z_vectors)
y_vectors = [operator(z) for z in z_basis]
q_basis = operator.codomain.gram_schmidt(y_vectors)
return q_basis
def _abstract_variable_rank_random_range(
operator: LinearOperator,
measure: GaussianMeasure,
initial_rank: int,
/,
*,
max_rank: Optional[int] = None,
power: int = 0,
block_size: int = 10,
rtol: float = 1e-4,
parallel: bool = False,
n_jobs: int = -1,
) -> List[Vector]:
"""
Internal. Computes an abstract matrix-free variable-rank range approximation.
Progressively samples blocks of random vectors from the measure until the
residual projection falls below the defined relative tolerance.
"""
if operator.domain != measure.domain:
raise ValueError("The measure must be defined on the operator's domain.")
if max_rank is None:
max_rank = min(operator.domain.dim, operator.codomain.dim)
q_basis = _abstract_fixed_rank_random_range(
operator, measure, initial_rank, power=power, parallel=parallel, n_jobs=n_jobs
)
tol = None
while len(q_basis) < max_rank:
test_samples = measure.samples(block_size, parallel=parallel, n_jobs=n_jobs)
y_test = [operator(w) for w in test_samples]
if tol is None:
norms_sq = [operator.codomain.squared_norm(y) for y in y_test]
norm_estimate = np.sqrt(sum(norms_sq)) / np.sqrt(block_size)
tol = rtol * norm_estimate
residuals = []
max_err = 0.0
for y in y_test:
y_proj = operator.codomain.zero
for q in q_basis:
proj = operator.codomain.inner_product(q, y)
operator.codomain.axpy(proj, q, y_proj)
res = operator.codomain.subtract(y, y_proj)
err = operator.codomain.norm(res)
max_err = max(max_err, err)
if err > 1e-12:
residuals.append(res)
if max_err < tol:
break
if not residuals:
break
try:
new_basis = operator.codomain.gram_schmidt(residuals)
cols_to_add = min(len(new_basis), max_rank - len(q_basis))
q_basis.extend(new_basis[:cols_to_add])
except ValueError:
break
return q_basis
# =====================================================================
# Internal Component-Based Routines
# =====================================================================
def _component_fixed_rank_random_range(
matrix: MatrixLike,
rank: int,
*,
power: int = 0,
parallel: bool = False,
n_jobs: int = -1,
) -> np.ndarray:
"""
Internal. Computes a fixed-rank approximation of a matrix's range.
Generates standard normal component arrays and uses SciPy QR decomposition
for fast Euclidean subspace extraction.
"""
m, n = matrix.shape
random_matrix = np.random.randn(n, rank)
if parallel:
product_matrix = parallel_mat_mat(matrix, random_matrix, n_jobs)
else:
product_matrix = matrix @ random_matrix
qr_factor, _ = qr(product_matrix, overwrite_a=True, mode="economic")
for _ in range(power):
if parallel:
tilde_product_matrix = parallel_mat_mat(matrix.T, qr_factor, n_jobs)
else:
tilde_product_matrix = matrix.T @ qr_factor
tilde_qr_factor, _ = qr(tilde_product_matrix, overwrite_a=True, mode="economic")
if parallel:
product_matrix = parallel_mat_mat(matrix, tilde_qr_factor, n_jobs)
else:
product_matrix = matrix @ tilde_qr_factor
qr_factor, _ = qr(product_matrix, overwrite_a=True, mode="economic")
return qr_factor
def _component_variable_rank_random_range(
matrix: MatrixLike,
initial_rank: int,
*,
max_rank: Optional[int] = None,
power: int = 0,
block_size: int = 10,
rtol: float = 1e-4,
parallel: bool = False,
n_jobs: int = -1,
) -> np.ndarray:
"""
Internal. Computes a variable-rank orthonormal basis via progressive sampling.
Applies standard Hutchinson-style block updates using component representations.
"""
m, n = matrix.shape
if max_rank is None:
max_rank = min(m, n)
# Initial Sample
random_matrix = np.random.randn(n, initial_rank)
if parallel:
ys = parallel_mat_mat(matrix, random_matrix, n_jobs)
else:
ys = matrix @ random_matrix
# Power Iterations on initial sample
for _ in range(power):
ys, _ = qr(ys, mode="economic")
if parallel:
ys_tilde = parallel_mat_mat(matrix.T, ys, n_jobs)
ys = parallel_mat_mat(matrix, ys_tilde, n_jobs)
else:
ys_tilde = matrix.T @ ys
ys = matrix @ ys_tilde
basis_vectors, _ = qr(ys, mode="economic")
tol = None
while basis_vectors.shape[1] < max_rank:
test_vectors = np.random.randn(n, block_size)
if parallel:
y_test = parallel_mat_mat(matrix, test_vectors, n_jobs)
else:
y_test = matrix @ test_vectors
if tol is None:
norm_estimate = np.linalg.norm(y_test) / np.sqrt(block_size)
tol = rtol * norm_estimate
residual = y_test - basis_vectors @ (basis_vectors.T @ y_test)
error = np.linalg.norm(residual, ord=2)
if error < tol:
break
new_basis, _ = qr(residual, mode="economic")
cols_to_add = min(new_basis.shape[1], max_rank - basis_vectors.shape[1])
if cols_to_add <= 0:
break
basis_vectors = np.hstack([basis_vectors, new_basis[:, :cols_to_add]])
return basis_vectors
# =====================================================================
# Specialized Matrix-Based Diagonal Estimator
# =====================================================================
[docs]
def random_diagonal(
matrix: MatrixLike,
size_estimate: int,
/,
*,
method: str = "variable",
use_rademacher: bool = False,
max_samples: Optional[int] = None,
rtol: float = 1e-2,
block_size: int = 10,
parallel: bool = False,
n_jobs: int = -1,
) -> np.ndarray:
"""
Computes an approximate diagonal of a square matrix using Hutchinson's method.
This algorithm uses a progressive, iterative approach to estimate the diagonal.
It starts with an initial number of samples and adds new blocks of random
vectors until the estimate of the diagonal converges to a specified tolerance.
Note: This is a specialized, component-based implementation relying on
element-wise array multiplication.
Args:
matrix: The (n, n) matrix or LinearOperator to analyze.
size_estimate: For 'fixed' method, the exact target rank. For 'variable'
method, this is the initial rank to sample.
method ({'variable', 'fixed'}): The algorithm to use.
- 'variable': Progressively samples to meet tolerance `rtol`.
- 'fixed': Returns an estimate based on exactly `size_estimate` samples.
use_rademacher: If true, draw components from [-1,1]. Default method draws
normally distributed components.
max_samples: For 'variable' method, a hard limit on the number of samples.
Ignored if method='fixed'. Defaults to dimension of matrix.
rtol: Relative tolerance for the 'variable' method.
block_size: Number of new vectors to sample per iteration in 'variable' method.
parallel: Whether to use parallel matrix multiplication.
n_jobs: Number of jobs for parallelism.
Returns:
np.ndarray: A 1D numpy array of size n containing the approximate diagonal.
"""
m, n = matrix.shape
if m != n:
raise ValueError("Input matrix must be square to estimate a diagonal.")
if max_samples is None:
max_samples = n
num_samples = min(size_estimate, max_samples)
if use_rademacher:
z = np.random.choice([-1.0, 1.0], size=(n, num_samples))
else:
z = np.random.randn(n, num_samples)
if parallel:
az = parallel_mat_mat(matrix, z, n_jobs)
else:
az = matrix @ z
diag_sum = np.sum(z * az, axis=1)
diag_estimate = diag_sum / num_samples
if method == "fixed" or num_samples >= max_samples:
return diag_estimate
while num_samples < max_samples:
old_diag_estimate = diag_estimate.copy()
# Generate a NEW block of random vectors
samples_to_add = min(block_size, max_samples - num_samples)
if use_rademacher:
z_new = np.random.choice([-1.0, 1.0], size=(n, samples_to_add))
else:
z_new = np.random.randn(n, samples_to_add)
if parallel:
az_new = parallel_mat_mat(matrix, z_new, n_jobs)
else:
az_new = matrix @ z_new
new_diag_sum = np.sum(z_new * az_new, axis=1)
# Update the running average
total_samples = num_samples + samples_to_add
diag_estimate = (diag_sum + new_diag_sum) / total_samples
# Check for convergence
norm_new_diag = np.linalg.norm(diag_estimate)
if norm_new_diag > 0:
error = np.linalg.norm(diag_estimate - old_diag_estimate) / norm_new_diag
if error < rtol:
break
# Update sums and counts for next iteration
diag_sum += new_diag_sum
num_samples = total_samples
return diag_estimate
[docs]
def deflated_diagonal(
operator: LinearOperator,
rank: int,
size_estimate: int,
/,
*,
method: str = "variable",
use_rademacher: bool = True,
max_samples: Optional[int] = None,
rtol: float = 1e-2,
block_size: int = 10,
galerkin: bool = False,
parallel: bool = False,
n_jobs: int = -1,
) -> np.ndarray:
"""
Estimates the diagonal of a square operator's component matrix using SVD deflation.
This combines a deterministic low-rank diagonal approximation (via SVD)
with a stochastic estimate of the residual diagonal (via Hutchinson's).
Args:
operator: The square LinearOperator to analyze.
rank: The rank of the deterministic SVD deflation.
size_estimate: Initial number of samples for the stochastic residual.
method: 'variable' or 'fixed' for the stochastic residual phase.
use_rademacher: If True, uses [-1, 1] Rademacher noise for Hutchinson's.
max_samples: Hard limit on residual samples.
rtol: Relative tolerance for the stochastic residual phase.
block_size: Samples added per iteration in the stochastic phase.
galerkin: If True, computes the diagonal of the Galerkin matrix.
parallel: Whether to compute operations in parallel.
n_jobs: Number of CPU cores to utilize.
Returns:
np.ndarray: A 1D array representing the diagonal of the operator.
"""
if operator.domain.dim != operator.codomain.dim:
raise ValueError("Operator must be square to extract a diagonal.")
if rank < 0 or size_estimate < 0:
raise ValueError("Rank and size_estimate must be non-negative.")
dim = operator.domain.dim
total_diagonal = np.zeros(dim)
# -------------------------------------------------------------
# 1. Deterministic Low-Rank Diagonal via SVD
# -------------------------------------------------------------
if rank > 0:
svd_op = LowRankSVD.from_randomized(
operator,
rank,
method="fixed",
galerkin=galerkin,
parallel=parallel,
n_jobs=n_jobs,
)
U_mat = svd_op.u_factor.matrix(dense=True, galerkin=galerkin)
V_mat = svd_op.v_factor.matrix(dense=True, galerkin=galerkin)
S_vec = svd_op.singular_values
# Diag(U @ S @ V^T)_i = sum_k U_{ik} S_k V_{ik}
total_diagonal += np.sum(U_mat * S_vec * V_mat, axis=1)
residual_op = operator - svd_op
else:
residual_op = operator
# -------------------------------------------------------------
# 2. Stochastic Residual Diagonal
# -------------------------------------------------------------
if size_estimate > 0:
scipy_residual_wrapper = residual_op.matrix(galerkin=galerkin)
stochastic_diag = random_diagonal(
scipy_residual_wrapper,
size_estimate,
method=method,
use_rademacher=use_rademacher,
max_samples=max_samples,
rtol=rtol,
block_size=block_size,
parallel=parallel,
n_jobs=n_jobs,
)
total_diagonal += stochastic_diag
return total_diagonal