"""
Provides an abstract framework for function spaces on symmetric manifolds.
This module offers an abstract framework for defining Hilbert spaces of
functions on symmetric spaces (like spheres or tori). The core design
leverages the spectral properties of the Laplace-Beltrami operator (Δ).
By inheriting from these base classes and implementing a small number of abstract
methods a concrete class can automatically gain a rich set of tools for defining
invariant operators and probability measures.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import (
Callable,
Any,
List,
Optional,
TypeAlias,
Iterator,
Tuple,
Union,
Literal,
)
import numpy as np
from scipy.sparse import diags
import scipy.sparse as sps
import scipy.sparse.linalg as splinalg
from pygeoinf.hilbert_space import (
HilbertSpace,
HilbertModule,
Vector,
MassWeightedHilbertModule,
EuclideanSpace,
)
from pygeoinf.linear_operators import LinearOperator, DiagonalSparseMatrixLinearOperator
from pygeoinf.linear_forms import LinearForm
from pygeoinf.gaussian_measure import GaussianMeasure
from pygeoinf.affine_operators import AffineOperator
from pygeoinf.linear_solvers import IterativeLinearSolver, LinearSolver, CGSolver
from pygeoinf.direct_sum import BlockDiagonalLinearOperator
# Alias for the index for the eigenvalues or eigenfunctions
Index: TypeAlias = int | tuple[int, ...]
# Alias for the truncation degree
Degree: TypeAlias = int | tuple[int, ...]
# Alias for a point within the symmetric manifold
Point: TypeAlias = float | tuple[float, ...]
# Alias for the value type of the fields
Value: TypeAlias = float | np.ndarray
[docs]
class InvariantLinearAutomorphism(DiagonalSparseMatrixLinearOperator):
"""
A class for LinearOperators on SymmetricHilbertSpaces
that are invariant under the symmetry group. Such operators
can be expressed as a function of the Laplace-Beltrami operator
and are diagonal within the eigenfunction basis.
"""
def __init__(
self,
domain: SymmetricHilbertSpace,
eigenvalues: np.ndarray,
):
"""
Args:
domain: The domain of the operator
eigenvalues: A vector of the operator's eigenvalues
"""
diagonals = (eigenvalues.reshape(1, -1), [0])
super().__init__(domain, domain, diagonals)
[docs]
@staticmethod
def from_index_function(
domain: SymmetricHilbertSpace, g: Callable[Index, float]
) -> InvariantLinearAutomorphism:
"""
Returns an invariant linear automorphism on a symmetric Hilbert space of the form
f(Δ) with f a function that is well-defined on the spectrum of the Laplacian, Δ.
Here the function, f, is expressed implicitly as a function, g, of the
eigenvalue index.
Args:
domain: The domain of the operator
g: The function expressed in terms of the eigenvalue index
"""
eigenvalues = np.array([g(index) for index in domain.indices], dtype=float)
return InvariantLinearAutomorphism(domain, eigenvalues)
[docs]
@staticmethod
def from_function(
domain: SymmetricHilbertSpace, f: Callable[float, float]
) -> InvariantLinearAutomorphism:
"""
Returns an invariant linear automorphism on a symmetric Hilbert space of the form
f(Δ) with f a function that is well-defined on the spectrum of the Laplacian, Δ.
Args:
domain: The domain of the operator
f: The function
"""
return InvariantLinearAutomorphism.from_index_function(
domain, lambda k: f(domain.laplacian_eigenvalue(k))
)
@property
def eigenvalues(self) -> np.ndarray:
"""Returns the operator's eigenvalues (the diagonal multipliers)."""
return self.extract_diagonal(galerkin=False)
@property
def trace(self) -> float:
"""
Returns the trace of the automorphism.
"""
return np.sum(self.eigenvalues)
@property
def inverse(self) -> InvariantLinearAutomorphism:
"""
Returns the inverse of the invariant automorphism.
Since the operator is diagonal in the spectral basis, the inverse
is formed by taking the reciprocal of the eigenvalues.
"""
current_eigenvalues = self.eigenvalues
if np.any(current_eigenvalues == 0):
raise ValueError("Cannot invert an operator with zero eigenvalues.")
return InvariantLinearAutomorphism(
self.domain, np.reciprocal(current_eigenvalues)
)
def __neg__(self) -> InvariantLinearAutomorphism:
current_diagonal = self.eigenvalues
return InvariantLinearAutomorphism(self.domain, -current_diagonal)
def __mul__(self, alpha: float) -> InvariantLinearAutomorphism:
current_diagonal = self.eigenvalues
return InvariantLinearAutomorphism(self.domain, alpha * current_diagonal)
def __rmul__(self, alpha: float) -> InvariantLinearAutomorphism:
return self * alpha
def __add__(
self, other: InvariantLinearAutomorphism | LinearOperator
) -> InvariantLinearAutomorphism | LinearOperator:
if isinstance(other, InvariantLinearAutomorphism):
if self.domain != other.domain:
raise ValueError("Domains must match.")
my_diagonal = self.eigenvalues
other_diagonal = other.eigenvalues
return InvariantLinearAutomorphism(
self.domain, my_diagonal + other_diagonal
)
return super().__add__(other)
def __sub__(
self, other: InvariantLinearAutomorphism | LinearOperator
) -> InvariantLinearAutomorphism | LinearOperator:
if isinstance(other, InvariantLinearAutomorphism):
if self.domain != other.domain:
raise ValueError("Domains must match.")
my_diagonal = self.eigenvalues
other_diagonal = other.eigenvalues
return InvariantLinearAutomorphism(
self.domain, my_diagonal - other_diagonal
)
return super().__sub__(other)
def __matmul__(
self, other: InvariantLinearAutomorphism | LinearOperator
) -> InvariantLinearAutomorphism | LinearOperator:
"""Composing two invariant operators via element-wise multiplication."""
if isinstance(other, InvariantLinearAutomorphism):
if self.codomain != other.domain:
raise ValueError("Domain/Codomain mismatch.")
my_diagonal = self.eigenvalues
other_diagonal = other.eigenvalues
return InvariantLinearAutomorphism(
self.domain, my_diagonal * other_diagonal
)
return super().__matmul__(other)
[docs]
class InvariantGaussianMeasure(GaussianMeasure):
"""
A class for GaussianMeasures on SymmetricHilbertSpaces
whose covariances are invariant under the symmetry group. The
covariances can be expressed as a function of the Laplace-Beltrami
operator and are diagonal within the eigenfunction basis.
"""
def __init__(
self,
domain: SymmetricHilbertSpace,
spectral_variances: np.ndarray,
/,
*,
expectation: Optional[Vector] = None,
):
"""
Initializes the InvariantGaussianMeasure.
Args:
domain: The symmetric space the measure is defined on.
spectral_variances: A 1D array of variances associated with the
eigenbasis (i.e., the eigenvalues of the covariance operator).
expectation: The mean vector. Defaults to zero.
"""
self._spectral_variances = spectral_variances
covariance = InvariantLinearAutomorphism(domain, spectral_variances)
squared_norms = domain.squared_norms
self._kl_scaling_array = np.sqrt(spectral_variances / squared_norms)
inverse_covariance = None
if np.all(spectral_variances > 0):
inverse_covariance = InvariantLinearAutomorphism(
domain, np.reciprocal(spectral_variances)
)
super().__init__(
covariance=covariance,
expectation=expectation,
sample=self._kl_sample,
inverse_covariance=inverse_covariance,
)
# ---------------------------------------------------------- #
# Constructors #
# ---------------------------------------------------------- #
[docs]
@staticmethod
def from_index_function(
domain: SymmetricHilbertSpace, g: Callable[Index, float], /, *, expectation=None
) -> InvariantGaussianMeasure:
"""
Returns an invariant Gaussian measure on a symmetric Hilbert space whose
covariance is of the form f(Δ) with f a function that is well-defined on
the spectrum of the Laplacian, Δ.
Here the function, f, is expressed implicitly as a function, g, of the
eigenvalue index.
Args:
domain: The domain of the operator
g: The function expressed in terms of the eigenvalue index
expectation: The expected value for the measure. Defaults to None which means zero.
"""
spectral_variances = np.fromiter(
(g(k) for k in domain.indices),
dtype=float,
count=domain.dim,
)
return InvariantGaussianMeasure(
domain, spectral_variances, expectation=expectation
)
[docs]
@staticmethod
def from_function(
domain: SymmetricHilbertSpace, f: Callable[float, float], /, *, expectation=None
) -> InvariantGaussianMeasure:
"""
Returns an invariant Gaussian measure on a symmetric Hilbert space whose
covariance is of the form f(Δ) with f a function that is well-defined on
the spectrum of the Laplacian, Δ.
Args:
domain: The domain of the operator
f: The function expressed in terms of the eigenvalue index
expectation: The expected value for the measure. Defaults to None which means zero.
"""
return InvariantGaussianMeasure.from_index_function(
domain, lambda k: f(domain.laplacian_eigenvalue(k)), expectation=expectation
)
# ---------------------------------------------------------- #
# Properties #
# ---------------------------------------------------------- #
@property
def spectral_variances(self) -> np.ndarray:
"""
Provides instant access to the exact eigenvalues of the covariance
operator, useful for log-determinant or KL-divergence calculations.
"""
return self._spectral_variances
# ---------------------------------------------------------- #
# Public methods #
# ---------------------------------------------------------- #
[docs]
def zero_expectation(self) -> InvariantGaussianMeasure:
"""
Returns a new invariant measure with the same covariance, but
with expectation set to zero.
"""
if self.has_zero_expectation:
return self
return InvariantGaussianMeasure(
self.domain,
self.spectral_variances,
expectation=None,
)
[docs]
def rescale_norm_variance(self, std: float) -> InvariantGaussianMeasure:
"""
Returns a new measure whose covariance is scaled such that
E[||x - E[x]||^2] = std^2.
The expectation of the measure is unchanged.
"""
current_trace = self.covariance.trace
if current_trace <= 0:
raise ValueError("Trace must be positive to perform rescaling.")
scale_factor_squared = (std**2) / current_trace
new_variances = self.spectral_variances * scale_factor_squared
return InvariantGaussianMeasure(
self.domain,
new_variances,
expectation=self._expectation, # Pass the raw attribute to avoid zero instantiation
)
# ------------------------------------------------------#
# Overloads of base class methods #
# ------------------------------------------------------#
[docs]
def affine_mapping(
self,
/,
*,
operator: LinearOperator = None,
translation: Vector = None,
affine_operator: AffineOperator = None,
inverse_solver: LinearSolver = None,
inverse_preconditioner: LinearOperator = None,
) -> GaussianMeasure:
"""
Transforms the measure under an affine map `y = A(x) + b`.
If the operator A is an `InvariantLinearAutomorphism` (or None), the
resulting measure remains invariant. If it is a generic operator and
an inverse solve is requested, a spectrally-regularized preconditioner
is automatically built.
"""
if operator is None and affine_operator is None:
new_expectation = (
translation
if self.has_zero_expectation
else (
self.domain.add(self.expectation, translation)
if translation
else self.expectation
)
)
return InvariantGaussianMeasure(
self.domain, self.spectral_variances, expectation=new_expectation
)
_op = affine_operator.linear_part if affine_operator else operator
if isinstance(_op, InvariantLinearAutomorphism):
new_covariance = self.covariance @ _op @ _op
if self.has_zero_expectation:
new_expectation = (
translation
if affine_operator is None
else affine_operator.translation_part
)
else:
mapped_exp = _op(self.expectation)
_trans = (
affine_operator.translation_part if affine_operator else translation
)
new_expectation = (
self.domain.add(mapped_exp, _trans) if _trans else mapped_exp
)
return InvariantGaussianMeasure(
self.domain, new_covariance.eigenvalues, expectation=new_expectation
)
if inverse_solver is not None and inverse_preconditioner is None:
if isinstance(inverse_solver, IterativeLinearSolver):
# A. Top-Left Block: "Water-filling" spectral regularization
max_var = np.max(self.spectral_variances)
threshold = max_var * 1e-8
reg_variances = np.maximum(self.spectral_variances, threshold)
reg_cov = InvariantLinearAutomorphism(self.domain, reg_variances)
# B. Bottom-Right Block: Delegate Jacobi Schur logic to the base class
new_covariance = _op @ self.covariance @ _op.adjoint
bottom_right_block = self._build_jacobi_schur_block(_op, new_covariance)
inverse_preconditioner = BlockDiagonalLinearOperator(
[reg_cov, bottom_right_block]
)
# 3. Defer to the base class with the injected smart preconditioner
return super().affine_mapping(
operator=operator,
translation=translation,
affine_operator=affine_operator,
inverse_solver=inverse_solver,
inverse_preconditioner=inverse_preconditioner,
)
[docs]
def kl_divergence(
self,
other: GaussianMeasure,
/,
*,
method: Literal["auto", "dense", "randomized"] = "auto",
**kwargs,
) -> float:
"""
Computes the exact or approximate Kullback-Leibler (KL) divergence D_KL(self || other).
If both measures are InvariantGaussianMeasures on the same domain,
and method="auto", this calculates the divergence in ultra-fast O(N)
time directly from the spectral variances.
"""
if (
method == "auto"
and isinstance(other, InvariantGaussianMeasure)
and self.domain == other.domain
):
k = self.domain.dim
var_p = self.spectral_variances
var_q = other.spectral_variances
# 1. Trace Term: Tr(Q^-1 P)
trace_term = np.sum(var_p / var_q)
# 2. Log-Det Term: ln|Q| - ln|P|
log_det_term = np.sum(np.log(var_q)) - np.sum(np.log(var_p))
# 3. Mahalanobis Term: <mu_P - mu_Q, Q^-1 (mu_P - mu_Q)>
if self.has_zero_expectation and other.has_zero_expectation:
mahalanobis_term = 0.0
else:
diff = self.domain.subtract(self.expectation, other.expectation)
inv_Q = other.covariance.inverse
diff_invQ = inv_Q(diff)
mahalanobis_term = self.domain.inner_product(diff, diff_invQ)
return float(0.5 * (trace_term + mahalanobis_term - k + log_det_term))
# Fallback to the base class (which handles 'dense' and 'randomized')
_method = "dense" if method == "auto" else method
return super().kl_divergence(other, method=_method, **kwargs)
def __neg__(self) -> InvariantGaussianMeasure:
"""Returns the measure with a negated expectation. Covariance is unchanged."""
return InvariantGaussianMeasure(
self.domain,
self.spectral_variances,
expectation=(
None
if self.has_zero_expectation
else self.domain.negative(self.expectation)
),
)
def __mul__(self, alpha: float) -> InvariantGaussianMeasure:
"""Scales the measure by a scalar alpha."""
new_variances = (alpha**2) * self.spectral_variances
new_expectation = (
None
if self.has_zero_expectation
else self.domain.multiply(alpha, self.expectation)
)
return InvariantGaussianMeasure(
self.domain,
new_variances,
expectation=new_expectation,
)
def __rmul__(self, alpha: float) -> InvariantGaussianMeasure:
return self * alpha
def __truediv__(self, alpha: float) -> InvariantGaussianMeasure:
return self * (1.0 / alpha)
def __add__(self, other: GaussianMeasure) -> GaussianMeasure:
"""Adds two independent Gaussian measures."""
if self.domain != other.domain:
raise ValueError("Measures must be defined on the same domain.")
if isinstance(other, InvariantGaussianMeasure):
new_variances = self.spectral_variances + other.spectral_variances
if self.has_zero_expectation and other.has_zero_expectation:
new_expectation = None
elif self.has_zero_expectation:
new_expectation = other.expectation
elif other.has_zero_expectation:
new_expectation = self.expectation
else:
new_expectation = self.domain.add(self.expectation, other.expectation)
return InvariantGaussianMeasure(
self.domain,
new_variances,
expectation=new_expectation,
)
return super().__add__(other)
def __sub__(self, other: GaussianMeasure) -> GaussianMeasure:
"""Subtracts two independent Gaussian measures."""
if self.domain != other.domain:
raise ValueError("Measures must be defined on the same domain.")
if isinstance(other, InvariantGaussianMeasure):
new_variances = self.spectral_variances + other.spectral_variances
if self.has_zero_expectation and other.has_zero_expectation:
new_expectation = None
elif self.has_zero_expectation:
new_expectation = self.domain.negative(other.expectation)
elif other.has_zero_expectation:
new_expectation = self.expectation
else:
new_expectation = self.domain.subtract(
self.expectation, other.expectation
)
return InvariantGaussianMeasure(
self.domain,
new_variances,
expectation=new_expectation,
)
return super().__sub__(other)
# ------------------------------------------------------#
# Private methods #
# ------------------------------------------------------#
def _kl_sample(self) -> Vector:
"""
Draws a sample using the Karhunen-Loève expansion.
"""
xi = np.random.randn(self.domain.dim)
scaled_components = xi * self._kl_scaling_array
sample_vector = self.domain.from_components(scaled_components)
if self.has_zero_expectation:
return sample_vector
return self.domain.add(sample_vector, self.expectation)
[docs]
class SymmetricHilbertSpace(HilbertSpace, ABC):
"""
An abstract base class for Hilbert spaces of functions spaces on
symmetric manifolds.
The implementation is based on the expansion of elements of the
space in terms of the eigenfunctions of the Laplace-Beltrami
operator.
To inherit from this base class, the user must provide methods
that provide information on the eigenvalues and eigenfunctions
of the Laplace-Beltrami operator, including mappings to and
from the co-ordinate basis. The eigenfunction basis is
necessarily orthogonal, but need not be normalised.
"""
def __init__(self, spatial_dim: int, degree: Degree, dim: int, orthonormal: bool):
"""
Initializes the abstract invariant Lebesgue space.
Args:
spatial_dim: The dimension of the symmetric manifold
degree: The truncation degree.
dim: The dimension of the space.
orthonormal: True if the eigenfunction basis is orthonormal.
"""
self._spatial_dim = spatial_dim
self._degree = degree
self._dim = dim
self._orthonormal = orthonormal
if self._orthonormal:
self._metric = None
self._inverse_metric = None
else:
metric_values = np.fromiter(
(self.laplacian_eigenvector_squared_norm(k) for k in self.indices),
dtype=float,
count=self.dim,
)
self._metric = diags([metric_values], [0])
self._inverse_metric = diags([np.reciprocal(metric_values)], [0])
# Add to SymmetricHilbertSpace
[docs]
@staticmethod
def heat_kernel(scale: float) -> Callable[[float], float]:
"""Returns a heat kernel covariance function: exp(-scale^2 * k)."""
return lambda k: np.exp(-(scale**2) * k)
[docs]
@staticmethod
def sobolev_kernel(order: float, scale: float) -> Callable[[float], float]:
"""Returns a Sobolev kernel covariance function: (1 + scale^2 * k)^-order."""
return lambda k: (1.0 + scale**2 * k) ** (-order)
# ------------------------------------------------------------#
# Properties #
# ------------------------------------------------------------#
@property
def spatial_dimension(self) -> int:
"""The dimension of the symetric manifold."""
return self._spatial_dim
@property
def degree(self) -> Degree:
"""The spectral truncation degree of the space."""
return self._degree
@property
def dim(self) -> int:
return self._dim
@property
def orthonormal(self) -> bool:
"""
True if the eigenfunction basis is normalised
"""
return self._orthonormal
@property
def squared_norms(self) -> np.ndarray:
"""
Returns a vector of the squared eigenvector norms.
"""
if self.orthonormal:
return np.ones(self.dim, dtype=float)
else:
return self._metric.diagonal(k=0)
@property
def indices(self) -> Iterator[Index]:
"""
Returns a list of all the eigenvalue indices
"""
return (self.integer_to_index(i) for i in range(self.dim))
@property
def laplacian(self) -> InvariantLinearAutomorphism:
"""
Returns the Laplacian, Δ, as an operator on the space.
Note that this is an unbounded operator and it should be
used with care.
"""
return self.invariant_automorphism(lambda k: k)
# ------------------------------------------------------------#
# Optimized Base Operator Overrides #
# ------------------------------------------------------------#
[docs]
def identity_operator(self) -> InvariantLinearAutomorphism:
"""
Returns the identity operator on the space.
Overridden to return an InvariantLinearAutomorphism, ensuring that
compositions with other invariant operators preserve the fast-path
diagonal structure.
"""
return InvariantLinearAutomorphism(self, np.ones(self.dim, dtype=float))
[docs]
def zero_operator(self, codomain: Optional[HilbertSpace] = None) -> LinearOperator:
"""
Returns the zero operator.
If the codomain is the space itself (or not provided), it returns an
InvariantLinearAutomorphism for optimal arithmetic operations.
Otherwise, it falls back to the standard abstract zero operator.
"""
codomain = self if codomain is None else codomain
if codomain == self:
return InvariantLinearAutomorphism(self, np.zeros(self.dim, dtype=float))
return super().zero_operator(codomain=codomain)
# ------------------------------------------------------------#
# Abstract methods #
# ------------------------------------------------------------#
@property
@abstractmethod
def gaussian_curvature(self) -> float:
"""
Returns the Gaussian curvature (K) of the symmetric manifold.
"""
[docs]
@abstractmethod
def index_to_integer(self, k: Index) -> int:
"""
Maps an eigenvalue index to an integer.
"""
[docs]
@abstractmethod
def integer_to_index(self, i: int) -> Index:
"""
Maps an integer to the eigenvalue index
"""
[docs]
@abstractmethod
def laplacian_eigenvalue(self, k: Index) -> float:
"""
Returns the eigenvalue of the Laplacian for a given index.
The index `k` can be a single integer (e.g., for a circle) or a
tuple of integers (e.g., for a sphere or torus), depending on the
geometry of the space.
Args:
k: The index of the eigenvalue to return.
"""
[docs]
@abstractmethod
def laplacian_eigenvector_squared_norm(self, k: Index) -> float:
"""
Returns the squared norm of the eigenvalue of the Laplacian for a given index.
The index `k` can be a single integer (e.g., for a circle) or a
tuple of integers (e.g., for a sphere or torus), depending on the
geometry of the space.
Args:
k: The index of the eigenvalue to return.
"""
[docs]
@abstractmethod
def laplacian_eigenvectors_at_point(self, x: Point) -> np.ndarray:
"""
Returns a list of the values of the eigenvectors at a given point
Args:
x: The evaluation point.
"""
[docs]
@abstractmethod
def random_point(self) -> Any:
"""Returns a single random point from the underlying symmetric space."""
[docs]
@abstractmethod
def geodesic_distance(self, p1: Any, p2: Any) -> float:
"""
Returns the shortest distance along the manifold between two points.
Args:
p1: The starting point.
p2: The end point.
"""
[docs]
@abstractmethod
def geodesic_quadrature(
self, p1: Any, p2: Any, n_points: int
) -> Tuple[List[Any], np.ndarray]:
"""
Returns quadrature points and weights for a geodesic between p1 and p2.
Returns:
points: List of manifold coordinates.
weights: Integration weights scaled by the line element.
"""
[docs]
def geodesic_ball_quadrature(
self, center: Any, radius: float, n_points: int
) -> Tuple[List[Any], np.ndarray]:
r"""Return quadrature points and weights for a geodesic ball.
Subclasses may implement geometry-specific quadrature rules. The
weights should include the manifold volume element so that
``sum_i weights[i] * f(points[i])`` approximates
$\int_{B(center, radius)} f(x)\,dV(x)$.
"""
raise NotImplementedError(
"Geodesic-ball quadrature is not implemented for this symmetric space."
)
[docs]
def geodesic_ball_integral(
self,
center: Any,
radius: float,
/,
*,
n_points: Optional[int] = None,
normalize: bool = False,
) -> LinearForm:
r"""Return a linear form for a geodesic-ball integral.
The returned form represents
$\int_{B(center, radius)} u(x)\,dV(x)$. If ``normalize`` is true, it
represents the average over the geodesic ball instead.
Subclasses with exact spectral formulas can override this method.
Otherwise, a subclass-provided ``geodesic_ball_quadrature`` rule is used.
Args:
center: Center point of the geodesic ball.
radius: Geodesic radius in the manifold's distance units.
n_points: Number of quadrature points requested from
``geodesic_ball_quadrature``.
normalize: If true, divide by the quadrature measure of the ball.
"""
if n_points is None:
raise NotImplementedError(
"Automatic geodesic-ball quadrature density is not available. "
"Pass n_points or use a space with an exact implementation."
)
points, weights = self.geodesic_ball_quadrature(center, radius, n_points)
measure = float(np.sum(weights))
components = np.zeros(self.dim)
for point, weight in zip(points, weights):
components += weight * np.asarray(
self.laplacian_eigenvectors_at_point(point)
)
if normalize:
if measure <= 0.0:
raise ValueError("Cannot normalize a zero-measure geodesic ball.")
components /= measure
return LinearForm(self, components=components)
[docs]
def geodesic_ball_average(
self,
center: Any,
radius: float,
/,
*,
n_points: Optional[int] = None,
) -> LinearForm:
r"""Return a linear form for the average over a geodesic ball."""
return self.geodesic_ball_integral(
center,
radius,
n_points=n_points,
normalize=True,
)
[docs]
@abstractmethod
def with_degree(self, degree: Degree) -> SymmetricHilbertSpace:
"""Returns a new instance of the space with a modified truncation degree."""
[docs]
@abstractmethod
def degree_transfer_operator(self, target_degree: Degree) -> LinearOperator:
"""Returns the transfer operator from this space to one with a different degree."""
[docs]
@abstractmethod
def invariant_covariance_function(
self, spectral_variances: np.ndarray
) -> Callable[[np.ndarray], np.ndarray]:
"""
Returns a vectorized function that evaluates the two-point covariance
of an invariant measure as a function of geodesic distance.
"""
[docs]
@abstractmethod
def degree_multiplicity(self, degree: int) -> int:
"""Returns the number of eigenfunctions associated with a given degree."""
[docs]
@abstractmethod
def representative_index(self, degree: int) -> Index:
"""Returns a valid eigenfunction index for the given degree."""
[docs]
@abstractmethod
def project_function(self, f: Callable[Point, Value]) -> Vector:
"""
Returns an element of the space by projecting a given function.
Args:
f: A function that takes a point and returns a value.
"""
# ------------------------------------------------------------#
# Public methods #
# ------------------------------------------------------------#
[docs]
def to_dual(self, x: Vector) -> LinearForm:
cx = self.to_components(x)
if self.orthonormal:
return LinearForm(self, components=cx)
else:
cxp = self._metric @ cx
return LinearForm(self, components=cxp)
[docs]
def from_dual(self, xp: LinearForm) -> Vector:
cxp = xp.components
if self.orthonormal:
return self.from_components(cxp)
else:
cx = self._inverse_metric @ cxp
return self.from_components(cx)
[docs]
def random_points(self, n: int) -> List[Point]:
"""
Returns a list of `n` random points.
Args:
n: The number of random points to generate.
"""
return [self.random_point() for _ in range(n)]
[docs]
def invariant_automorphism(
self, f: Callable[float, float]
) -> InvariantLinearAutomorphism:
"""
Returns an automorphism of the form f(Δ) with f a function
that is well-defined on the spectrum of the Laplacian, Δ.
Args:
f: The function
"""
return InvariantLinearAutomorphism.from_function(self, f)
[docs]
def invariant_gaussian_measure(
self, f: Callable[float, float], /, *, expectation=None
) -> InvariantGaussianMeasure:
"""
Returns a Gaussian measure on the space whose covariance takes the form f(Δ) with f
a function that is well-defined on the spectrum of the Laplacian, Δ.
In order for the covariance to be well-defined, the trace of the covariance operator
must be finite. This condition (in the sense of convergence as the size of the
approximating space being increased) is not checked.
Args:
f: The function
expectation: The expected value for measure. Defaults to zero
Returns:
The measure as an instance of InvariantGaussianMeasure
"""
return InvariantGaussianMeasure.from_function(self, f, expectation=expectation)
[docs]
def norm_scaled_invariant_gaussian_measure(
self, f: Callable[float, float], /, *, expectation=None, std=1.0
) -> InvariantGaussianMeasure:
"""
Returns a Gaussian measure on the space whose covariance takes the form f(Δ) with f
a function that is well-defined on the spectrum of the Laplacian, Δ.
In order for the covariance to be well-defined, the trace of the covariance operator
must be finite. This condition (in the sense of convergence as the size of the
approximating space being increased) is not checked.
The measure's covariance is scaled such that:
E[||x||^2] = std*std + ||E[x]||^2
Args:
f: The function
expectation: The expected value for measure. Defaults to zero
std: The desired standard deviation
Returns:
The measure as an instance of InvariantGaussianMeasure
"""
initial_measure = self.invariant_gaussian_measure(f, expectation=expectation)
return initial_measure.rescale_norm_variance(std)
[docs]
def sobolev_kernel_gaussian_measure(
self, order: float, scale: float, /, *, expectation: Vector = None
):
"""
Returns an invariant Gaussian measure with a Sobolev-type covariance
equal to (1 + scale^2 * Δ)^-order.
Args:
order: Order parameter for the covariance.
scale: Scale parameter for the covariance.
expectation: The expected value for measure. Defaults to zero
"""
return self.invariant_gaussian_measure(
self.sobolev_kernel(order, scale), expectation=expectation
)
[docs]
def norm_scaled_sobolev_kernel_gaussian_measure(
self,
order: float,
scale: float,
/,
*,
expectation: Vector = None,
std: float = 1,
):
"""
Returns an invariant Gaussian measure with a Sobolev-type covariance
proportional to (1 + scale^2 * Δ)^-order.
The measure's covariance is scaled such that the expected value for the
samples norm is equal to the given standard deviation.
Args:
order: Order parameter for the covariance.
scale: Scale parameter for the covariance.
expectation: The expected value for measure. Defaults to zero
std: The desired standard deviation
"""
return self.norm_scaled_invariant_gaussian_measure(
lambda k: (1 + scale**2 * k) ** -order, expectation=expectation, std=std
)
[docs]
def heat_kernel_gaussian_measure(
self, scale: float, /, *, expectation: Vector = None
):
"""
Returns an invariant Gaussian measure with a heat kernel covariance
equal to exp(-scale^2 * Δ).
Args:
scale: Scale parameter for the covariance.
expectation: The expected value for measure. Defaults to zero
"""
return self.invariant_gaussian_measure(
self.heat_kernel(scale), expectation=expectation
)
[docs]
def norm_scaled_heat_kernel_gaussian_measure(
self, scale: float, /, *, expectation: Vector = None, std: float = 1
):
"""
Returns an invariant Gaussian measure with a heat kernel covariance
equal to exp(-scale^2 * Δ).
Args:
scale: Scale parameter for the covariance.
expectation: The expected value for measure. Defaults to zero
std: The desired standard deviation
"""
return self.norm_scaled_invariant_gaussian_measure(
lambda k: np.exp(-(scale**2) * k), expectation=expectation, std=std
)
[docs]
def cluster_points(
self,
points: List[Point],
/,
*,
threshold: Optional[float] = None,
n_clusters: Optional[int] = None,
linkage_method: str = "complete",
) -> List[List[int]]:
"""
Clusters a list of points into interacting blocks based on their
geodesic distance.
This is particularly useful for generating the 'interacting_blocks'
required by localized preconditioners in Bayesian inversions.
Args:
points: A list of points on the symmetric manifold.
threshold: The maximum geodesic distance between points in a cluster.
n_clusters: The exact number of clusters to form (alternative to threshold).
linkage_method: The hierarchical clustering method to use ('complete',
'average', 'single', etc.). Defaults to 'complete' to
ensure clusters remain geographically compact.
Returns:
A list of lists, where each sub-list contains the indices of the
points belonging to a specific cluster.
"""
from scipy.cluster.hierarchy import linkage, fcluster
n = len(points)
if n == 0:
return []
if n == 1:
return [[0]]
if threshold is None and n_clusters is None:
raise ValueError("You must specify either 'threshold' or 'n_clusters'.")
# 1. Compute pairwise geodesic distances (in SciPy's condensed 1D format)
# A condensed matrix is required by the linkage function
distances = np.zeros(n * (n - 1) // 2)
idx = 0
for i in range(n):
for j in range(i + 1, n):
distances[idx] = self.geodesic_distance(points[i], points[j])
idx += 1
# 2. Perform agglomerative hierarchical clustering
Z = linkage(distances, method=linkage_method)
# 3. Extract the flat clusters based on the user's criteria
if n_clusters is not None:
labels = fcluster(Z, t=n_clusters, criterion="maxclust")
else:
labels = fcluster(Z, t=threshold, criterion="distance")
# 4. Group the point indices by their assigned cluster label
blocks = {}
for i, label in enumerate(labels):
blocks.setdefault(label, []).append(i)
return list(blocks.values())
[docs]
def pairs_within_distance(
self, points: List[Point], max_distance: float
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Finds all pairs of points within a given geodesic distance.
Returns arrays of row indices, column indices, and their geodesic distances.
"""
n = len(points)
rows, cols, dists = [], [], []
for i in range(n):
for j in range(n): # Full symmetric sweep
d = self.geodesic_distance(points[i], points[j])
if d <= max_distance:
rows.append(i)
cols.append(j)
dists.append(d)
return np.array(rows), np.array(cols), np.array(dists)
[docs]
def estimate_truncation_degree(
self,
covariance_function: Callable[[float], float],
/,
*,
rtol: float = 1e-6,
min_degree: int = 0,
max_degree: Optional[int] = None,
) -> int:
"""
Estimates the truncation degree required to capture the expected
energy of a function drawn from a prior measure.
"""
summation = 0.0
degree = 0
err = 1.0
while err > rtol:
# Safety ceiling
if max_degree is not None and degree >= max_degree:
return max_degree
idx = self.representative_index(degree)
lambda_val = self.laplacian_eigenvalue(idx)
variance = covariance_function(lambda_val)
norm_sq = self.laplacian_eigenvector_squared_norm(idx)
multiplicity = self.degree_multiplicity(degree)
term_energy = variance * multiplicity * norm_sq
summation += term_energy
if summation > 0:
err = term_energy / summation
else:
err = 1.0
if err <= rtol:
break
degree += 1
if degree > 100000:
raise RuntimeError("Failed to converge on a stable truncation degree.")
# Safety floor
return max(degree, min_degree)
[docs]
def l2_products_operator(self, weighting_functions: List[Any]) -> LinearOperator:
"""
Creates an operator that computes a vector of L2 inner products.
The action on a function 'u' returns a vector 'd' where d_i = <u, w_i>_L2.
Args:
weighting_functions: A list of vectors representing the weighting kernels.
Returns:
LinearOperator: A mapping from this space to EuclideanSpace(len(weighting_functions)).
"""
return LinearOperator.from_vectors(self, weighting_functions)
[docs]
class AbstractSymmetricLebesgueSpace(HilbertModule, SymmetricHilbertSpace, ABC):
"""
A specialisation for scalar-valued L² function spaces on symmetric manifolds.
To be instantiated, such a class must provide the following additional methods:
vector_multiply
vector_sqrt
"""
[docs]
def gradient_dot_product(self, f: Vector, g: Vector) -> Vector:
"""
Computes the pointwise dot product of the surface gradients of two fields.
"""
L = self.laplacian
fg = self.vector_multiply(f, g)
f_Lg = self.vector_multiply(f, L(g))
g_Lf = self.vector_multiply(g, L(f))
result = L(fg)
self.axpy(-1.0, f_Lg, result)
self.axpy(-1.0, g_Lf, result)
self.ax(0.5, result)
return result
[docs]
def flexural_operator(
self,
flexural_rigidity: Union[Vector, float, int],
poisson_ratio: Union[Vector, float, int],
buoyancy_factor: Union[Vector, float, int],
) -> LinearOperator:
r"""
Constructs the generalized 4th-order variable-coefficient flexure operator
for a thin elastic shell on a Riemannian manifold.
This operator models the normal deflection of a shell subjected to a transverse
load. It accounts for spatially varying material properties without requiring
explicit tensor calculus by utilizing Bochner-style identities to express
the inner product of Hessians purely in terms of Laplacians and surface gradients.
The operator evaluates:
Op(w) = L(D * L(w)) - tr(Hess(D_eff) * Hess(w)) + rho_g * w
Where:
* w is the normal deflection.
* D is the flexural rigidity.
* D_eff is the effective rigidity: D * (1 - nu).
* nu is Poisson's ratio.
* rho_g is the foundation restoring force (buoyancy factor).
* L() is the Laplace-Beltrami operator.
Mathematical Details (The Hessian Calculation):
---------------------------------------------
Because the framework is built on the spectral properties of the Laplacian,
it does not compute Hessians directly. Instead, it uses a well-known identity
for the inner product of two Hessians on a 2D manifold:
tr(Hess(A) * Hess(B)) = 0.5 * L(grad(A) . grad(B))
- 0.5 * (grad(L(A)) . grad(B))
- 0.5 * (grad(A) . grad(L(B)))
- K * (grad(A) . grad(B))
Here, K is the Gaussian curvature of the manifold. In flat Euclidean space,
derivatives commute and K = 0. On a curved manifold (like a sphere), derivatives
do not commute. The Gaussian curvature K acts as the precise geometric penalty
required to correct for this non-commutativity. The method `gradient_dot_product`
is used to evaluate the (grad(A) . grad(B)) terms internally.
Args:
flexural_rigidity: The flexural rigidity field (D). Can be a spatially
varying Vector in this space or a constant scalar.
poisson_ratio: Poisson's ratio (nu). Can be a spatially varying Vector
or a constant scalar.
buoyancy_factor: The density contrast or foundation restoring force (rho * g).
Can be a spatially varying Vector or a constant scalar.
Returns:
LinearOperator: A self-adjoint linear operator mapping a deflection field
to its corresponding load field.
"""
L = self.laplacian
K = self.gaussian_curvature
if isinstance(flexural_rigidity, (float, int)):
D = self.project_function(lambda _: flexural_rigidity)
else:
D = flexural_rigidity
if isinstance(poisson_ratio, (float, int)):
nu = self.project_function(lambda _: poisson_ratio)
else:
nu = poisson_ratio
if isinstance(buoyancy_factor, (float, int)):
rho_g = self.project_function(lambda _: buoyancy_factor)
else:
rho_g = buoyancy_factor
one_vec = self.project_function(lambda _: 1.0)
one_minus_nu = self.subtract(one_vec, nu)
D_eff = self.vector_multiply(D, one_minus_nu)
delta_D_eff = L(D_eff)
def apply_operator(w: Any) -> Any:
delta_w = L(w)
D_delta_w = self.vector_multiply(D, delta_w)
principal_term = L(D_delta_w)
grad_Deff_w = self.gradient_dot_product(D_eff, w)
poisson_penalty = self.multiply(0.5, L(grad_Deff_w))
term2_base = self.gradient_dot_product(delta_D_eff, w)
self.axpy(-0.5, term2_base, poisson_penalty)
term3_base = self.gradient_dot_product(D_eff, delta_w)
self.axpy(-0.5, term3_base, poisson_penalty)
self.axpy(-K, grad_Deff_w, poisson_penalty)
result = principal_term
self.axpy(-1.0, poisson_penalty, result)
if isinstance(rho_g, (float, int)):
self.axpy(float(rho_g), w, result)
else:
restoring_force = self.vector_multiply(rho_g, w)
self.axpy(1.0, restoring_force, result)
return result
return LinearOperator.self_adjoint(self, apply_operator)
[docs]
def inverse_flexural_operator(
self,
flexural_rigidity: Union[Vector, float, int],
poisson_ratio: Union[Vector, float, int],
buoyancy_factor: Union[Vector, float, int],
/,
*,
baseline_rigidity: Optional[float] = None,
baseline_buoyancy: Optional[float] = None,
solver: Optional[IterativeLinearSolver] = None,
) -> LinearOperator:
r"""
Constructs the inverse of the generalized variable-coefficient flexure operator.
This method acts as an intelligent factory, mapping a spatial load field to
the resulting normal deflection. It utilizes a dual-path execution strategy
depending on the spatial variance of the input parameters:
1. **Analytical Fast-Path:** If $D$, $\nu$, and $\rho_g$ are all constant scalars,
the Poisson penalty vanishes ($\nabla D_{eff} = 0$), and the operator simplifies
exactly to $D \Delta^2 + \rho_g$. The method returns an ultra-fast, exact
`InvariantLinearAutomorphism` by reciprocating the spectral eigenvalues.
2. **Iterative Slow-Path:** If any parameter is a spatially varying field,
the method wraps a preconditioned Conjugate Gradient (CG) iterative solver
inside a standard `LinearOperator` interface.
To accelerate the iterative solver, an invariant preconditioner is built using
baseline scalar values for rigidity and buoyancy. If these baselines are not
explicitly provided, the solver automatically computes the spatial means of
the provided fields via $L^2$ projection.
Args:
flexural_rigidity: The flexural rigidity field $D$ or a constant scalar.
poisson_ratio: Poisson's ratio $\nu$ or a constant scalar.
buoyancy_factor: The restoring force $\rho_g$ or a constant scalar.
baseline_rigidity: An optional scalar guess for the average rigidity,
used to construct the spectral preconditioner.
baseline_buoyancy: An optional scalar guess for the average buoyancy,
used to construct the spectral preconditioner.
solver: An optional pre-configured `IterativeLinearSolver` instance
(e.g., to inject specific tolerances or progress callbacks).
Defaults to a standard `CGSolver`.
Returns:
LinearOperator: A self-adjoint linear operator mapping a load field
to its corresponding deflection field.
"""
all_scalars = all(
isinstance(param, (float, int))
for param in (flexural_rigidity, poisson_ratio, buoyancy_factor)
)
if all_scalars:
D = float(flexural_rigidity)
rho_g = float(buoyancy_factor)
return self.invariant_automorphism(lambda k: 1.0 / (D * (k**2) + rho_g))
flexural_operator = self.flexural_operator(
flexural_rigidity, poisson_ratio, buoyancy_factor
)
def compute_average(f: Union[Vector, float, int]) -> float:
if isinstance(f, (float, int)):
return float(f)
else:
one = self.project_function(lambda _: 1.0)
num = self.inner_product(f, one)
den = self.squared_norm(one)
return num / den
if baseline_rigidity is not None:
D0 = baseline_rigidity
else:
D0 = compute_average(flexural_rigidity)
if baseline_buoyancy is not None:
rho_0 = baseline_buoyancy
else:
rho_0 = compute_average(buoyancy_factor)
preconditioner = self.invariant_automorphism(
lambda k: 1.0 / (D0 * (k**2) + rho_0)
)
_solver = CGSolver() if solver is None else solver
if not isinstance(_solver, IterativeLinearSolver):
raise ValueError(
"The solver prodided must be an instance of IterativeLinearSolver"
)
return _solver(flexural_operator, preconditioner=preconditioner)
[docs]
class SymmetricSobolevSpace(MassWeightedHilbertModule, AbstractSymmetricLebesgueSpace):
"""
The Sobolev space Hˢ constructed over a symmetric Lebesgue space.
This implementation leverages the mass-weighting framework to ensure that
the inner product and dual mappings correctly account for the smoothness
order and scale.
"""
def __init__(
self,
lebesgue_space: AbstractSymmetricLebesgueSpace,
order: float,
scale: float,
/,
*,
safe: Optional[bool] = True,
) -> None:
"""
Args:
lebesgue_space: The underlying L² space (which provides Δ eigenvalues).
order: The Sobolev smoothness order (s).
scale: The Sobolev length-scale (κ).
safe: If true, the class checks for mathematical correctness of operations
where possible.
"""
self._order = order
self._scale = scale
self._safe = safe
mass_operator = lebesgue_space.invariant_automorphism(self.sobolev_function)
inverse_mass_operator = mass_operator.inverse
MassWeightedHilbertModule.__init__(
self, lebesgue_space, mass_operator, inverse_mass_operator
)
AbstractSymmetricLebesgueSpace.__init__(
self,
lebesgue_space.spatial_dimension,
lebesgue_space.degree,
lebesgue_space.dim,
False,
)
@property
def order(self) -> float:
"""The Sobolev order."""
return self._order
@property
def scale(self) -> float:
"""The Sobolev length-scale."""
return self._scale
@property
def safe(self) -> bool:
"""
True if mathematical checks are being applied.
"""
return self._safe
[docs]
@abstractmethod
def with_order(self, order: float) -> SymmetricSobolevSpace:
"""
Returns a new instance of the exact same space but with a modified
Sobolev order.
"""
[docs]
def order_inclusion_operator(self, target_order: float) -> LinearOperator:
"""Returns the inclusion operator from this space to one of a lower order."""
if self.safe and target_order > self.order:
raise ValueError(
"Target order must be less than or equal to the current order."
)
codomain = self.with_order(target_order)
underlying_identity = self.underlying_space.identity_operator()
return LinearOperator.from_formal_adjoint(self, codomain, underlying_identity)
[docs]
def sobolev_function(self, lambda_val) -> float:
"""
Returns the value of the Sobolev function associated with the space.
"""
return (1.0 + (self.scale**2) * lambda_val) ** self.order
[docs]
def dirac(self, point: Point) -> LinearForm:
"""
Returns the Dirac measure at the chosen point.
"""
if self.safe and self.order <= self.spatial_dimension / 2:
raise NotImplementedError("The Dirac measure is not defined on this space.")
cxp = self.laplacian_eigenvectors_at_point(point)
return LinearForm(self, components=cxp)
[docs]
def dirac_representation(self, point) -> Vector:
"""
Returns the representation of the Dirac measure at the chosen point.
"""
return self.from_dual(self.dirac(point))
[docs]
def point_evaluation_operator(self, points: List[Any]) -> LinearOperator:
"""
Returns a linear operator that evaluates a function at a list of points.
The resulting operator maps a function (a vector in this space) to a
vector in Euclidean space containing the function's values at the
specified locations. This is the primary mechanism for creating a
forward operator that links a function field to a set of discrete
measurements.
Args:
points: A list of points at which to evaluate the functions.
"""
if self.safe and self.order <= self.spatial_dimension / 2:
raise NotImplementedError("Point evaluation is not defined on this space")
dim = len(points)
matrix = np.zeros((dim, self.dim))
for i, point in enumerate(points):
cp = self.dirac(point).components
matrix[i, :] = cp
return LinearOperator.from_matrix(
self, EuclideanSpace(dim), matrix, galerkin=True
)
[docs]
def point_value_scaled_invariant_gaussian_measure(
self,
f: Callable[[float], float],
/,
*,
expectation: Vector = None,
std: float = 1,
):
"""
Returns an invariant Gaussian measure with covariance proportional to f(Δ),
where f must be such that this operator is trace-class.
The covariance of the operator is scaled such that the standard deviation
of the point-wise values are equal to the given std.
Args:
f: A real-valued function that is well-defined on the spectrum
of the Laplacian, Δ.
std: The desired standard deviation for the pointwise values.
Raises:
NotImplementedError: If the Sobolev order is less than n/2, with n the spatial dimension.
Notes:
This method applies for symmetric spaces an invariant measures. As a result, the
pointwise variance is the same at all points. Internally, a random point is chosen
to carry out the normalisation.
"""
unscaled_measure = InvariantGaussianMeasure.from_function(
self, f, expectation=expectation
)
return unscaled_measure.rescale_directional_variance(
self.dirac_representation(self.random_point()), std
)
[docs]
def point_value_scaled_sobolev_kernel_gaussian_measure(
self,
order: float,
scale: float,
/,
*,
expectation: Vector = None,
std: float = 1,
):
"""
Returns an invariant Gaussian measure with a Sobolev-type covariance
proportional to (1 + scale^2 * Δ)^-order.
The covariance of the operator is scaled such that the standard deviation
of the point-wise values are equal to the given amplitude.
Args:
order: Order parameter for the covariance.
scale: Scale parameter for the covariance.
std: The desired standard deviation for the pointwise values.
"""
return self.point_value_scaled_invariant_gaussian_measure(
lambda k: (1 + scale**2 * k) ** -order, expectation=expectation, std=std
)
[docs]
def point_value_scaled_heat_kernel_gaussian_measure(
self, scale: float, /, *, expectation: Vector = None, std: float = 1
):
"""
Returns an invariant Gaussian measure with a heat-kernel covariance
proportional to exp(-scale^2 * Δ).
The covariance of the operator is scaled such that the standard deviation
of the point-wise values are equal to the given amplitude.
Args:
scale: Scale parameter for the covariance.
std: The desired standard deviation for the pointwise values.
"""
return self.point_value_scaled_invariant_gaussian_measure(
lambda k: np.exp(-(scale**2) * k), expectation=expectation, std=std
)
[docs]
def geodesic_integral(
self, p1: Point, p2: Point, /, *, n_points: Optional[int] = None
) -> LinearForm:
"""
Returns a linear functional representing the line integral of a function
along a geodesic path.
This method approximates the integral :math:`\\int_{\\gamma} u(s) ds`, where
:math:`\\gamma` is the shortest path (geodesic) connecting points `p1` and `p2`.
The integral is represented as a :class:`LinearForm` in the dual space,
constructed by summing weighted point evaluations (Dirac measures) along
the path.
For Hilbert spaces with a specified :attr:`scale`, the method can
automatically determine the required quadrature density to resolve the
smooth features of the space's sensitivity kernels.
Args:
p1: The starting point of the geodesic. The type is manifold-dependent
(e.g., float for :class:`Circle`, tuple for :class:`Sphere`).
p2: The end point of the geodesic.
n_points (int, optional): The number of Gauss-Legendre quadrature points.
If None, it is heuristically determined as:
:math:`n = \\lceil (\\text{arc\\_length} / \\text{scale}) \\times 2 \\rceil`.
This ensures at least two points per characteristic length-scale,
providing stable sampling of the sensitivity kernel. Defaults to None.
Returns:
LinearForm: A linear functional whose action on a vector `u` computes
the approximated line integral.
Raises:
NotImplementedError: If the Sobolev order :math:`s` is less than or
equal to half the spatial dimension :math:`n/2`.
"""
if self.safe and self.order <= self.spatial_dimension / 2:
raise NotImplementedError(
f"Order {self.order} is too low for point evaluation on a "
f"{self.spatial_dimension}D manifold."
)
if n_points is None:
_, temp_weights = self.geodesic_quadrature(p1, p2, n_points=2)
arc_length = np.sum(temp_weights)
n_points = int(np.ceil((arc_length / self.scale) * 2.0))
n_points = max(2, n_points)
points, weights = self.geodesic_quadrature(p1, p2, n_points)
total_components = np.zeros(self.dim)
for pt, weight in zip(points, weights):
total_components += weight * self.dirac(pt).components
return LinearForm(self, components=total_components)
[docs]
def geodesic_integral_representation(
self, p1: Point, p2: Point, /, *, n_points: Optional[int] = None
) -> Vector:
"""
Returns the Riesz representation (sensitivity kernel) of the line integral.
This maps the LinearForm (the integral functional) back into the
primal Hilbert space. Visualizing this vector reveals the "sensitivity"
of the line integral to perturbations at different locations in the domain.
Args:
p1, p2: Start and end points of the geodesic.
n_points: Number of quadrature points.
"""
integral_form = self.geodesic_integral(p1, p2, n_points=n_points)
return self.from_dual(integral_form)
[docs]
def path_average_operator(self, paths, /, *, n_points=None):
"""
Constructs a tomographic operator mapping a function field to its
line integrals along a set of geodesic paths.
Note: Despite the name, this operator returns the line integral
(the dual pairing of the function with the path functional) rather
than a normalized average, unless the user manually scales the forms.
This corresponds to the 'path average' convention often used in
seismic and atmospheric tomography.
Args:
paths (List[Tuple[Any, Any]]): A list of start and end point pairs
defining the geodesics.
n_points (int, optional): The number of quadrature points per path.
If None, the heuristic based on the Sobolev scale is used.
Returns:
LinearOperator: An operator mapping Space -> EuclideanSpace(len(paths)).
The adjoint of this operator performs the 'back-projection'
mapping data residuals into the function space.
"""
path_forms = [
self.geodesic_integral(p1, p2, n_points=n_points) for p1, p2 in paths
]
return LinearOperator.from_linear_forms(path_forms)
[docs]
def random_source_receiver_paths(
self, n_sources: int, n_receivers: int
) -> List[Tuple[Any, Any]]:
"""
Generates a list of source-receiver pairs by connecting every source to
every receiver.
This method uses the existing :meth:`random_points` logic to generate
coordinates appropriate for the specific symmetric space. For a set
of S sources and R receivers, this returns a list of S*R paths.
Args:
n_sources: The number of random source locations to generate.
n_receivers: The number of random receiver locations to generate.
Returns:
List[Tuple[Any, Any]]: A list of tuples, where each tuple contains
a (source, receiver) pair.
"""
# Generate the points using the existing base class method
sources = self.random_points(n_sources)
receivers = self.random_points(n_receivers)
# Create the full-mesh network
paths = []
for src in sources:
for rec in receivers:
paths.append((src, rec))
return paths
[docs]
def cluster_points(
self,
points: List[Point],
/,
*,
threshold: Optional[float] = None,
n_clusters: Optional[int] = None,
linkage_method: str = "complete",
) -> List[List[int]]:
"""
Clusters a list of points into interacting blocks based on their
geodesic distance.
This is particularly useful for generating the 'interacting_blocks'
required by localized preconditioners in Bayesian inversions.
Args:
points: A list of points on the symmetric manifold.
threshold: The maximum geodesic distance between points in a cluster.
n_clusters: The exact number of clusters to form (alternative to threshold).
linkage_method: The hierarchical clustering method to use ('complete',
'average', 'single', etc.). Defaults to 'complete' to
ensure clusters remain geographically compact.
Returns:
A list of lists, where each sub-list contains the indices of the
points belonging to a specific cluster.
"""
return self.underlying_space.cluster_points(
points,
threshold=threshold,
n_clusters=n_clusters,
linkage_method=linkage_method,
)
[docs]
def distance_localized_preconditioner(
self,
prior_measure: InvariantGaussianMeasure,
points: List[Point],
max_distance: float,
/,
*,
data_error_measure: Optional[GaussianMeasure] = None,
apply_taper: bool = False,
parallel: bool = False,
n_jobs: int = -1,
) -> LinearOperator:
"""
Builds a highly specialized, ultra-fast sparse preconditioner for point
evaluation problems when the prior measure is invariant.
This exploits the property that the two-point covariance depends solely
on the geodesic distance. It maps out the 1D covariance function along
a deterministic path, interpolates it, optionally applies a Gaspari-Cohn taper to
ensure positive-definiteness, and populates a sparse normal matrix.
If max_distance is set to 0.0, it bypasses the distance calculations
and instantly builds a purely diagonal (Jacobi) preconditioner.
Args:
prior_measure: The invariant prior Gaussian measure.
points: The list of observation points.
max_distance: The geodesic distance beyond which covariance is assumed
to be zero (enforces sparsity). Set to 0.0 for a pure diagonal.
data_error_measure: The Gaussian measure describing the data noise. Can be None
if the noise is already folded into the prior_measure.
apply_taper: If true, applies Gaspari-Cohn taper.
parallel: If True, computes the error measure diagonal in parallel.
n_jobs: Number of CPU cores to use if parallel=True.
Returns:
A LinearOperator representing the inverse of the approximated sparse normal matrix.
"""
n_data = len(points)
data_space = EuclideanSpace(n_data)
# =======================================================
# SPECIAL CASE: Purely Diagonal (Jacobi) Preconditioner
# =======================================================
if max_distance <= 0.0:
p1 = self.random_point()
rep1 = self.dirac_representation(p1)
q_rep1 = prior_measure.covariance(rep1)
prior_var = float(self.inner_product(rep1, q_rep1))
if data_error_measure is not None:
noise_variance = data_error_measure.covariance.extract_diagonal(
galerkin=True, parallel=parallel, n_jobs=n_jobs
)
inv_diag = 1.0 / (prior_var + noise_variance)
else:
inv_diag = 1.0 / prior_var
def apply_diag_preconditioner(x):
c_vec = data_space.to_components(x)
return data_space.from_components(c_vec * inv_diag)
return LinearOperator(
data_space,
data_space,
apply_diag_preconditioner,
adjoint_mapping=apply_diag_preconditioner,
)
# =======================================================
# STANDARD CASE: Distance-Localized Sparse Preconditioner
# =======================================================
def gaspari_cohn_vectorized(d_array, c_val):
"""Vectorized Gaspari-Cohn taper for lightning-fast array evaluations."""
z = d_array / c_val
taper = np.zeros_like(z)
mask1 = z <= 1.0
z1 = z[mask1]
taper[mask1] = (
1.0
- (5.0 / 3.0) * z1**2
+ (5.0 / 8.0) * z1**3
+ (1.0 / 2.0) * z1**4
- (1.0 / 4.0) * z1**5
)
mask2 = (z > 1.0) & (z <= 2.0)
z2 = z[mask2]
taper[mask2] = (
4.0
- 5.0 * z2
+ (5.0 / 3.0) * z2**2
+ (5.0 / 8.0) * z2**3
- (1.0 / 2.0) * z2**4
+ (1.0 / 12.0) * z2**5
- (2.0 / 3.0) / z2
)
return taper
row_indices, col_indices, dists = self.pairs_within_distance(
points, max_distance
)
# Get the exact, vectorized covariance function from the manifold
cov_evaluator = self.invariant_covariance_function(
prior_measure.spectral_variances
)
raw_vals = cov_evaluator(dists)
# The Gaspari-Cohn taper remains unchanged
c_taper = max_distance / 2.0
if apply_taper:
tapers = gaspari_cohn_vectorized(dists, c_taper)
values = raw_vals * tapers
else:
values = raw_vals
H_sparse = sps.coo_matrix(
(values, (row_indices, col_indices)), shape=(n_data, n_data)
)
if data_error_measure is not None:
noise_variance = data_error_measure.covariance.extract_diagonal(
galerkin=True, parallel=parallel, n_jobs=n_jobs
)
R_sparse = sps.diags(noise_variance)
H_approx = (H_sparse + R_sparse).tocsc()
else:
H_approx = H_sparse.tocsc()
splu_solver = splinalg.splu(H_approx)
def apply_sparse_preconditioner(x):
c_vec = data_space.to_components(x)
c_solved = splu_solver.solve(c_vec)
return data_space.from_components(c_solved)
return LinearOperator(
data_space,
data_space,
apply_sparse_preconditioner,
adjoint_mapping=apply_sparse_preconditioner,
)
[docs]
def degree_transfer_operator(self, target_degree: Degree) -> LinearOperator:
codomain = self.with_degree(target_degree)
lebesgue_inclusion = self.underlying_space.degree_transfer_operator(
target_degree
)
return LinearOperator.from_formal_adjoint(self, codomain, lebesgue_inclusion)
[docs]
def l2_products_operator(self, weighting_functions: List[Any]) -> LinearOperator:
"""
Creates an operator that computes a vector of L2 inner products.
The action on a function 'u' returns a vector 'd' where d_i = <u, w_i>_L2.
Args:
weighting_functions: A list of vectors representing the weighting kernels.
Returns:
LinearOperator: A mapping from this space to EuclideanSpace(len(weighting_functions)).
"""
l2_operator = self.underlying_space.l2_products_operator(weighting_functions)
return LinearOperator.from_formal_adjoint(
self, l2_operator.codomain, l2_operator
)
[docs]
def flexural_operator(
self,
flexural_rigidity: Union[Vector, float, int],
poisson_ratio: Union[Vector, float, int],
buoyancy_factor: Union[Vector, float, int],
) -> LinearOperator:
r"""
Constructs the generalized 4th-order variable-coefficient flexure operator
for a thin elastic shell on a Riemannian manifold.
This operator models the normal deflection of a shell subjected to a transverse
load. It accounts for spatially varying material properties without requiring
explicit tensor calculus by utilizing Bochner-style identities to express
the inner product of Hessians purely in terms of Laplacians and surface gradients.
Mathematically, the operator evaluates:
$$ Op(w) = \Delta(D \Delta w) - \text{tr}(\text{Hess}(D(1-\nu)) \text{Hess}(w)) + \rho_g w $$
Where:
* $w$ is the normal deflection.
* $D$ is the flexural rigidity.
* $\nu$ is Poisson's ratio.
* $\rho_g$ is the foundation restoring force (buoyancy factor).
* $K$ is the Gaussian curvature of the underlying manifold (handled internally).
Args:
flexural_rigidity: The flexural rigidity field $D$. Can be a spatially
varying Vector in this space or a constant scalar.
poisson_ratio: Poisson's ratio $\nu$. Can be a spatially varying Vector
or a constant scalar.
buoyancy_factor: The density contrast or foundation restoring force $\rho_g$.
Can be a spatially varying Vector or a constant scalar.
Returns:
LinearOperator: A self-adjoint linear operator mapping a deflection field
to its corresponding load field.
"""
l2_operator = self.underlying_space.flexural_operator(
flexural_rigidity, poisson_ratio, buoyancy_factor
)
return LinearOperator.from_formally_self_adjoint(self, l2_operator)
[docs]
def inverse_flexural_operator(
self,
flexural_rigidity: Union[Vector, float, int],
poisson_ratio: Union[Vector, float, int],
buoyancy_factor: Union[Vector, float, int],
/,
*,
baseline_rigidity: Optional[float] = None,
baseline_buoyancy: Optional[float] = None,
solver: Optional[IterativeLinearSolver] = None,
) -> LinearOperator:
r"""
Constructs the inverse of the generalized variable-coefficient flexure operator.
This method acts as an intelligent factory, mapping a spatial load field to
the resulting normal deflection. It utilizes a dual-path execution strategy
depending on the spatial variance of the input parameters:
1. **Analytical Fast-Path:** If $D$, $\nu$, and $\rho_g$ are all constant scalars,
the Poisson penalty vanishes ($\nabla D_{eff} = 0$), and the operator simplifies
exactly to $D \Delta^2 + \rho_g$. The method returns an ultra-fast, exact
`InvariantLinearAutomorphism` by reciprocating the spectral eigenvalues.
2. **Iterative Slow-Path:** If any parameter is a spatially varying field,
the method wraps a preconditioned Conjugate Gradient (CG) iterative solver
inside a standard `LinearOperator` interface.
To accelerate the iterative solver, an invariant preconditioner is built using
baseline scalar values for rigidity and buoyancy. If these baselines are not
explicitly provided, the solver automatically computes the spatial means of
the provided fields via $L^2$ projection.
Args:
flexural_rigidity: The flexural rigidity field $D$ or a constant scalar.
poisson_ratio: Poisson's ratio $\nu$ or a constant scalar.
buoyancy_factor: The restoring force $\rho_g$ or a constant scalar.
baseline_rigidity: An optional scalar guess for the average rigidity,
used to construct the spectral preconditioner.
baseline_buoyancy: An optional scalar guess for the average buoyancy,
used to construct the spectral preconditioner.
solver: An optional pre-configured `IterativeLinearSolver` instance
(e.g., to inject specific tolerances or progress callbacks).
Defaults to a standard `CGSolver`.
Returns:
LinearOperator: A self-adjoint linear operator mapping a load field
to its corresponding deflection field.
"""
l2_operator = self.underlying_space.inverse_flexural_operator(
flexural_rigidity,
poisson_ratio,
buoyancy_factor,
baseline_rigidity=baseline_rigidity,
baseline_buoyancy=baseline_buoyancy,
solver=solver,
)
return LinearOperator.from_formal_adjoint(
self, l2_operator.codomain, l2_operator
)
# ------------------------------------------------------- #
# Methods defered to the Lebesgue space #
# ------------------------------------------------------- #
@property
def gaussian_curvature(self) -> float:
return self.underlying_space.gaussian_curvature
[docs]
def index_to_integer(self, k: Index) -> int:
return self.underlying_space.index_to_integer(k)
[docs]
def integer_to_index(self, i: int) -> Index:
return self.underlying_space.integer_to_index(i)
[docs]
def laplacian_eigenvalue(self, k: Index) -> float:
return self.underlying_space.laplacian_eigenvalue(k)
[docs]
def laplacian_eigenvector_squared_norm(self, k: Index) -> float:
l2_norm_sq = self.underlying_space.laplacian_eigenvector_squared_norm(k)
lambda_k = self.laplacian_eigenvalue(k)
weight = self.sobolev_function(lambda_k)
return l2_norm_sq * weight
[docs]
def laplacian_eigenvectors_at_point(self, x: Point) -> List[Value]:
return self.underlying_space.laplacian_eigenvectors_at_point(x)
[docs]
def random_point(self) -> Point:
return self.underlying_space.random_point()
[docs]
def geodesic_distance(self, p1: Point, p2: Point) -> float:
return self.underlying_space.geodesic_distance(p1, p2)
[docs]
def geodesic_quadrature(
self, p1: Any, p2: Any, n_points: int
) -> Tuple[List[Any], np.ndarray]:
return self.underlying_space.geodesic_quadrature(p1, p2, n_points)
[docs]
def geodesic_ball_quadrature(
self, center: Any, radius: float, n_points: int
) -> Tuple[List[Any], np.ndarray]:
return self.underlying_space.geodesic_ball_quadrature(center, radius, n_points)
[docs]
def geodesic_ball_integral(
self,
center: Any,
radius: float,
/,
*,
n_points: Optional[int] = None,
normalize: bool = False,
) -> LinearForm:
try:
underlying_form = self.underlying_space.geodesic_ball_integral(
center,
radius,
n_points=n_points,
normalize=normalize,
)
except NotImplementedError:
return super().geodesic_ball_integral(
center,
radius,
n_points=n_points,
normalize=normalize,
)
return LinearForm(self, components=underlying_form.components.copy())
[docs]
def pairs_within_distance(
self, points: List[Point], max_distance: float
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
return self.underlying_space.pairs_within_distance(points, max_distance)
[docs]
def invariant_covariance_function(
self, spectral_variances: np.ndarray
) -> Callable[[np.ndarray], np.ndarray]:
return self.underlying_space.invariant_covariance_function(spectral_variances)
[docs]
def degree_multiplicity(self, degree: int) -> int:
return self.underlying_space.degree_multiplicity(degree)
[docs]
def representative_index(self, degree: int) -> Index:
return self.underlying_space.representative_index(degree)
def __eq__(self, other: object) -> bool:
"""
Checks for mathematical equality with another Sobolev spaces.
"""
if not isinstance(other, SymmetricSobolevSpace):
return NotImplemented
check1 = self.underlying_space == other.underlying_space
check2 = self.order == other.order
check3 = self.scale == other.scale
return check1 and check2 and check3