Source code for pygeoinf.symmetric_space.symmetric_space

"""
Provides an abstract framework for function spaces on symmetric manifolds.

This module offers an abstract framework for defining Hilbert spaces of
functions on symmetric spaces (like spheres or tori). The core design
leverages the spectral properties of the Laplace-Beltrami operator (Δ).

By inheriting from these base classes and implementing a small number of abstract
methods a concrete class can automatically gain a rich set of tools for defining
invariant operators and probability measures.
"""

from __future__ import annotations
from abc import ABC, abstractmethod
from typing import (
    Callable,
    Any,
    List,
    Optional,
    TypeAlias,
    Iterator,
    Tuple,
    Union,
    Literal,
)

import numpy as np
from scipy.sparse import diags
import scipy.sparse as sps
import scipy.sparse.linalg as splinalg

from pygeoinf.hilbert_space import (
    HilbertSpace,
    HilbertModule,
    Vector,
    MassWeightedHilbertModule,
    EuclideanSpace,
)
from pygeoinf.linear_operators import LinearOperator, DiagonalSparseMatrixLinearOperator
from pygeoinf.linear_forms import LinearForm
from pygeoinf.gaussian_measure import GaussianMeasure

from pygeoinf.affine_operators import AffineOperator

from pygeoinf.linear_solvers import IterativeLinearSolver, LinearSolver, CGSolver
from pygeoinf.direct_sum import BlockDiagonalLinearOperator

# Alias for the index for the eigenvalues or eigenfunctions
Index: TypeAlias = int | tuple[int, ...]

# Alias for the truncation degree
Degree: TypeAlias = int | tuple[int, ...]

# Alias for a point within the symmetric manifold
Point: TypeAlias = float | tuple[float, ...]

# Alias for the value type of the fields
Value: TypeAlias = float | np.ndarray


[docs] class InvariantLinearAutomorphism(DiagonalSparseMatrixLinearOperator): """ A class for LinearOperators on SymmetricHilbertSpaces that are invariant under the symmetry group. Such operators can be expressed as a function of the Laplace-Beltrami operator and are diagonal within the eigenfunction basis. """ def __init__( self, domain: SymmetricHilbertSpace, eigenvalues: np.ndarray, ): """ Args: domain: The domain of the operator eigenvalues: A vector of the operator's eigenvalues """ diagonals = (eigenvalues.reshape(1, -1), [0]) super().__init__(domain, domain, diagonals)
[docs] @staticmethod def from_index_function( domain: SymmetricHilbertSpace, g: Callable[Index, float] ) -> InvariantLinearAutomorphism: """ Returns an invariant linear automorphism on a symmetric Hilbert space of the form f(Δ) with f a function that is well-defined on the spectrum of the Laplacian, Δ. Here the function, f, is expressed implicitly as a function, g, of the eigenvalue index. Args: domain: The domain of the operator g: The function expressed in terms of the eigenvalue index """ eigenvalues = np.array([g(index) for index in domain.indices], dtype=float) return InvariantLinearAutomorphism(domain, eigenvalues)
[docs] @staticmethod def from_function( domain: SymmetricHilbertSpace, f: Callable[float, float] ) -> InvariantLinearAutomorphism: """ Returns an invariant linear automorphism on a symmetric Hilbert space of the form f(Δ) with f a function that is well-defined on the spectrum of the Laplacian, Δ. Args: domain: The domain of the operator f: The function """ return InvariantLinearAutomorphism.from_index_function( domain, lambda k: f(domain.laplacian_eigenvalue(k)) )
@property def eigenvalues(self) -> np.ndarray: """Returns the operator's eigenvalues (the diagonal multipliers).""" return self.extract_diagonal(galerkin=False) @property def trace(self) -> float: """ Returns the trace of the automorphism. """ return np.sum(self.eigenvalues) @property def inverse(self) -> InvariantLinearAutomorphism: """ Returns the inverse of the invariant automorphism. Since the operator is diagonal in the spectral basis, the inverse is formed by taking the reciprocal of the eigenvalues. """ current_eigenvalues = self.eigenvalues if np.any(current_eigenvalues == 0): raise ValueError("Cannot invert an operator with zero eigenvalues.") return InvariantLinearAutomorphism( self.domain, np.reciprocal(current_eigenvalues) ) def __neg__(self) -> InvariantLinearAutomorphism: current_diagonal = self.eigenvalues return InvariantLinearAutomorphism(self.domain, -current_diagonal) def __mul__(self, alpha: float) -> InvariantLinearAutomorphism: current_diagonal = self.eigenvalues return InvariantLinearAutomorphism(self.domain, alpha * current_diagonal) def __rmul__(self, alpha: float) -> InvariantLinearAutomorphism: return self * alpha def __add__( self, other: InvariantLinearAutomorphism | LinearOperator ) -> InvariantLinearAutomorphism | LinearOperator: if isinstance(other, InvariantLinearAutomorphism): if self.domain != other.domain: raise ValueError("Domains must match.") my_diagonal = self.eigenvalues other_diagonal = other.eigenvalues return InvariantLinearAutomorphism( self.domain, my_diagonal + other_diagonal ) return super().__add__(other) def __sub__( self, other: InvariantLinearAutomorphism | LinearOperator ) -> InvariantLinearAutomorphism | LinearOperator: if isinstance(other, InvariantLinearAutomorphism): if self.domain != other.domain: raise ValueError("Domains must match.") my_diagonal = self.eigenvalues other_diagonal = other.eigenvalues return InvariantLinearAutomorphism( self.domain, my_diagonal - other_diagonal ) return super().__sub__(other) def __matmul__( self, other: InvariantLinearAutomorphism | LinearOperator ) -> InvariantLinearAutomorphism | LinearOperator: """Composing two invariant operators via element-wise multiplication.""" if isinstance(other, InvariantLinearAutomorphism): if self.codomain != other.domain: raise ValueError("Domain/Codomain mismatch.") my_diagonal = self.eigenvalues other_diagonal = other.eigenvalues return InvariantLinearAutomorphism( self.domain, my_diagonal * other_diagonal ) return super().__matmul__(other)
[docs] class InvariantGaussianMeasure(GaussianMeasure): """ A class for GaussianMeasures on SymmetricHilbertSpaces whose covariances are invariant under the symmetry group. The covariances can be expressed as a function of the Laplace-Beltrami operator and are diagonal within the eigenfunction basis. """ def __init__( self, domain: SymmetricHilbertSpace, spectral_variances: np.ndarray, /, *, expectation: Optional[Vector] = None, ): """ Initializes the InvariantGaussianMeasure. Args: domain: The symmetric space the measure is defined on. spectral_variances: A 1D array of variances associated with the eigenbasis (i.e., the eigenvalues of the covariance operator). expectation: The mean vector. Defaults to zero. """ self._spectral_variances = spectral_variances covariance = InvariantLinearAutomorphism(domain, spectral_variances) squared_norms = domain.squared_norms self._kl_scaling_array = np.sqrt(spectral_variances / squared_norms) inverse_covariance = None if np.all(spectral_variances > 0): inverse_covariance = InvariantLinearAutomorphism( domain, np.reciprocal(spectral_variances) ) super().__init__( covariance=covariance, expectation=expectation, sample=self._kl_sample, inverse_covariance=inverse_covariance, ) # ---------------------------------------------------------- # # Constructors # # ---------------------------------------------------------- #
[docs] @staticmethod def from_index_function( domain: SymmetricHilbertSpace, g: Callable[Index, float], /, *, expectation=None ) -> InvariantGaussianMeasure: """ Returns an invariant Gaussian measure on a symmetric Hilbert space whose covariance is of the form f(Δ) with f a function that is well-defined on the spectrum of the Laplacian, Δ. Here the function, f, is expressed implicitly as a function, g, of the eigenvalue index. Args: domain: The domain of the operator g: The function expressed in terms of the eigenvalue index expectation: The expected value for the measure. Defaults to None which means zero. """ spectral_variances = np.fromiter( (g(k) for k in domain.indices), dtype=float, count=domain.dim, ) return InvariantGaussianMeasure( domain, spectral_variances, expectation=expectation )
[docs] @staticmethod def from_function( domain: SymmetricHilbertSpace, f: Callable[float, float], /, *, expectation=None ) -> InvariantGaussianMeasure: """ Returns an invariant Gaussian measure on a symmetric Hilbert space whose covariance is of the form f(Δ) with f a function that is well-defined on the spectrum of the Laplacian, Δ. Args: domain: The domain of the operator f: The function expressed in terms of the eigenvalue index expectation: The expected value for the measure. Defaults to None which means zero. """ return InvariantGaussianMeasure.from_index_function( domain, lambda k: f(domain.laplacian_eigenvalue(k)), expectation=expectation )
# ---------------------------------------------------------- # # Properties # # ---------------------------------------------------------- # @property def spectral_variances(self) -> np.ndarray: """ Provides instant access to the exact eigenvalues of the covariance operator, useful for log-determinant or KL-divergence calculations. """ return self._spectral_variances # ---------------------------------------------------------- # # Public methods # # ---------------------------------------------------------- #
[docs] def zero_expectation(self) -> InvariantGaussianMeasure: """ Returns a new invariant measure with the same covariance, but with expectation set to zero. """ if self.has_zero_expectation: return self return InvariantGaussianMeasure( self.domain, self.spectral_variances, expectation=None, )
[docs] def rescale_norm_variance(self, std: float) -> InvariantGaussianMeasure: """ Returns a new measure whose covariance is scaled such that E[||x - E[x]||^2] = std^2. The expectation of the measure is unchanged. """ current_trace = self.covariance.trace if current_trace <= 0: raise ValueError("Trace must be positive to perform rescaling.") scale_factor_squared = (std**2) / current_trace new_variances = self.spectral_variances * scale_factor_squared return InvariantGaussianMeasure( self.domain, new_variances, expectation=self._expectation, # Pass the raw attribute to avoid zero instantiation )
# ------------------------------------------------------# # Overloads of base class methods # # ------------------------------------------------------#
[docs] def affine_mapping( self, /, *, operator: LinearOperator = None, translation: Vector = None, affine_operator: AffineOperator = None, inverse_solver: LinearSolver = None, inverse_preconditioner: LinearOperator = None, ) -> GaussianMeasure: """ Transforms the measure under an affine map `y = A(x) + b`. If the operator A is an `InvariantLinearAutomorphism` (or None), the resulting measure remains invariant. If it is a generic operator and an inverse solve is requested, a spectrally-regularized preconditioner is automatically built. """ if operator is None and affine_operator is None: new_expectation = ( translation if self.has_zero_expectation else ( self.domain.add(self.expectation, translation) if translation else self.expectation ) ) return InvariantGaussianMeasure( self.domain, self.spectral_variances, expectation=new_expectation ) _op = affine_operator.linear_part if affine_operator else operator if isinstance(_op, InvariantLinearAutomorphism): new_covariance = self.covariance @ _op @ _op if self.has_zero_expectation: new_expectation = ( translation if affine_operator is None else affine_operator.translation_part ) else: mapped_exp = _op(self.expectation) _trans = ( affine_operator.translation_part if affine_operator else translation ) new_expectation = ( self.domain.add(mapped_exp, _trans) if _trans else mapped_exp ) return InvariantGaussianMeasure( self.domain, new_covariance.eigenvalues, expectation=new_expectation ) if inverse_solver is not None and inverse_preconditioner is None: if isinstance(inverse_solver, IterativeLinearSolver): # A. Top-Left Block: "Water-filling" spectral regularization max_var = np.max(self.spectral_variances) threshold = max_var * 1e-8 reg_variances = np.maximum(self.spectral_variances, threshold) reg_cov = InvariantLinearAutomorphism(self.domain, reg_variances) # B. Bottom-Right Block: Delegate Jacobi Schur logic to the base class new_covariance = _op @ self.covariance @ _op.adjoint bottom_right_block = self._build_jacobi_schur_block(_op, new_covariance) inverse_preconditioner = BlockDiagonalLinearOperator( [reg_cov, bottom_right_block] ) # 3. Defer to the base class with the injected smart preconditioner return super().affine_mapping( operator=operator, translation=translation, affine_operator=affine_operator, inverse_solver=inverse_solver, inverse_preconditioner=inverse_preconditioner, )
[docs] def kl_divergence( self, other: GaussianMeasure, /, *, method: Literal["auto", "dense", "randomized"] = "auto", **kwargs, ) -> float: """ Computes the exact or approximate Kullback-Leibler (KL) divergence D_KL(self || other). If both measures are InvariantGaussianMeasures on the same domain, and method="auto", this calculates the divergence in ultra-fast O(N) time directly from the spectral variances. """ if ( method == "auto" and isinstance(other, InvariantGaussianMeasure) and self.domain == other.domain ): k = self.domain.dim var_p = self.spectral_variances var_q = other.spectral_variances # 1. Trace Term: Tr(Q^-1 P) trace_term = np.sum(var_p / var_q) # 2. Log-Det Term: ln|Q| - ln|P| log_det_term = np.sum(np.log(var_q)) - np.sum(np.log(var_p)) # 3. Mahalanobis Term: <mu_P - mu_Q, Q^-1 (mu_P - mu_Q)> if self.has_zero_expectation and other.has_zero_expectation: mahalanobis_term = 0.0 else: diff = self.domain.subtract(self.expectation, other.expectation) inv_Q = other.covariance.inverse diff_invQ = inv_Q(diff) mahalanobis_term = self.domain.inner_product(diff, diff_invQ) return float(0.5 * (trace_term + mahalanobis_term - k + log_det_term)) # Fallback to the base class (which handles 'dense' and 'randomized') _method = "dense" if method == "auto" else method return super().kl_divergence(other, method=_method, **kwargs)
def __neg__(self) -> InvariantGaussianMeasure: """Returns the measure with a negated expectation. Covariance is unchanged.""" return InvariantGaussianMeasure( self.domain, self.spectral_variances, expectation=( None if self.has_zero_expectation else self.domain.negative(self.expectation) ), ) def __mul__(self, alpha: float) -> InvariantGaussianMeasure: """Scales the measure by a scalar alpha.""" new_variances = (alpha**2) * self.spectral_variances new_expectation = ( None if self.has_zero_expectation else self.domain.multiply(alpha, self.expectation) ) return InvariantGaussianMeasure( self.domain, new_variances, expectation=new_expectation, ) def __rmul__(self, alpha: float) -> InvariantGaussianMeasure: return self * alpha def __truediv__(self, alpha: float) -> InvariantGaussianMeasure: return self * (1.0 / alpha) def __add__(self, other: GaussianMeasure) -> GaussianMeasure: """Adds two independent Gaussian measures.""" if self.domain != other.domain: raise ValueError("Measures must be defined on the same domain.") if isinstance(other, InvariantGaussianMeasure): new_variances = self.spectral_variances + other.spectral_variances if self.has_zero_expectation and other.has_zero_expectation: new_expectation = None elif self.has_zero_expectation: new_expectation = other.expectation elif other.has_zero_expectation: new_expectation = self.expectation else: new_expectation = self.domain.add(self.expectation, other.expectation) return InvariantGaussianMeasure( self.domain, new_variances, expectation=new_expectation, ) return super().__add__(other) def __sub__(self, other: GaussianMeasure) -> GaussianMeasure: """Subtracts two independent Gaussian measures.""" if self.domain != other.domain: raise ValueError("Measures must be defined on the same domain.") if isinstance(other, InvariantGaussianMeasure): new_variances = self.spectral_variances + other.spectral_variances if self.has_zero_expectation and other.has_zero_expectation: new_expectation = None elif self.has_zero_expectation: new_expectation = self.domain.negative(other.expectation) elif other.has_zero_expectation: new_expectation = self.expectation else: new_expectation = self.domain.subtract( self.expectation, other.expectation ) return InvariantGaussianMeasure( self.domain, new_variances, expectation=new_expectation, ) return super().__sub__(other) # ------------------------------------------------------# # Private methods # # ------------------------------------------------------# def _kl_sample(self) -> Vector: """ Draws a sample using the Karhunen-Loève expansion. """ xi = np.random.randn(self.domain.dim) scaled_components = xi * self._kl_scaling_array sample_vector = self.domain.from_components(scaled_components) if self.has_zero_expectation: return sample_vector return self.domain.add(sample_vector, self.expectation)
[docs] class SymmetricHilbertSpace(HilbertSpace, ABC): """ An abstract base class for Hilbert spaces of functions spaces on symmetric manifolds. The implementation is based on the expansion of elements of the space in terms of the eigenfunctions of the Laplace-Beltrami operator. To inherit from this base class, the user must provide methods that provide information on the eigenvalues and eigenfunctions of the Laplace-Beltrami operator, including mappings to and from the co-ordinate basis. The eigenfunction basis is necessarily orthogonal, but need not be normalised. """ def __init__(self, spatial_dim: int, degree: Degree, dim: int, orthonormal: bool): """ Initializes the abstract invariant Lebesgue space. Args: spatial_dim: The dimension of the symmetric manifold degree: The truncation degree. dim: The dimension of the space. orthonormal: True if the eigenfunction basis is orthonormal. """ self._spatial_dim = spatial_dim self._degree = degree self._dim = dim self._orthonormal = orthonormal if self._orthonormal: self._metric = None self._inverse_metric = None else: metric_values = np.fromiter( (self.laplacian_eigenvector_squared_norm(k) for k in self.indices), dtype=float, count=self.dim, ) self._metric = diags([metric_values], [0]) self._inverse_metric = diags([np.reciprocal(metric_values)], [0]) # Add to SymmetricHilbertSpace
[docs] @staticmethod def heat_kernel(scale: float) -> Callable[[float], float]: """Returns a heat kernel covariance function: exp(-scale^2 * k).""" return lambda k: np.exp(-(scale**2) * k)
[docs] @staticmethod def sobolev_kernel(order: float, scale: float) -> Callable[[float], float]: """Returns a Sobolev kernel covariance function: (1 + scale^2 * k)^-order.""" return lambda k: (1.0 + scale**2 * k) ** (-order)
# ------------------------------------------------------------# # Properties # # ------------------------------------------------------------# @property def spatial_dimension(self) -> int: """The dimension of the symetric manifold.""" return self._spatial_dim @property def degree(self) -> Degree: """The spectral truncation degree of the space.""" return self._degree @property def dim(self) -> int: return self._dim @property def orthonormal(self) -> bool: """ True if the eigenfunction basis is normalised """ return self._orthonormal @property def squared_norms(self) -> np.ndarray: """ Returns a vector of the squared eigenvector norms. """ if self.orthonormal: return np.ones(self.dim, dtype=float) else: return self._metric.diagonal(k=0) @property def indices(self) -> Iterator[Index]: """ Returns a list of all the eigenvalue indices """ return (self.integer_to_index(i) for i in range(self.dim)) @property def laplacian(self) -> InvariantLinearAutomorphism: """ Returns the Laplacian, Δ, as an operator on the space. Note that this is an unbounded operator and it should be used with care. """ return self.invariant_automorphism(lambda k: k) # ------------------------------------------------------------# # Optimized Base Operator Overrides # # ------------------------------------------------------------#
[docs] def identity_operator(self) -> InvariantLinearAutomorphism: """ Returns the identity operator on the space. Overridden to return an InvariantLinearAutomorphism, ensuring that compositions with other invariant operators preserve the fast-path diagonal structure. """ return InvariantLinearAutomorphism(self, np.ones(self.dim, dtype=float))
[docs] def zero_operator(self, codomain: Optional[HilbertSpace] = None) -> LinearOperator: """ Returns the zero operator. If the codomain is the space itself (or not provided), it returns an InvariantLinearAutomorphism for optimal arithmetic operations. Otherwise, it falls back to the standard abstract zero operator. """ codomain = self if codomain is None else codomain if codomain == self: return InvariantLinearAutomorphism(self, np.zeros(self.dim, dtype=float)) return super().zero_operator(codomain=codomain)
# ------------------------------------------------------------# # Abstract methods # # ------------------------------------------------------------# @property @abstractmethod def gaussian_curvature(self) -> float: """ Returns the Gaussian curvature (K) of the symmetric manifold. """
[docs] @abstractmethod def index_to_integer(self, k: Index) -> int: """ Maps an eigenvalue index to an integer. """
[docs] @abstractmethod def integer_to_index(self, i: int) -> Index: """ Maps an integer to the eigenvalue index """
[docs] @abstractmethod def laplacian_eigenvalue(self, k: Index) -> float: """ Returns the eigenvalue of the Laplacian for a given index. The index `k` can be a single integer (e.g., for a circle) or a tuple of integers (e.g., for a sphere or torus), depending on the geometry of the space. Args: k: The index of the eigenvalue to return. """
[docs] @abstractmethod def laplacian_eigenvector_squared_norm(self, k: Index) -> float: """ Returns the squared norm of the eigenvalue of the Laplacian for a given index. The index `k` can be a single integer (e.g., for a circle) or a tuple of integers (e.g., for a sphere or torus), depending on the geometry of the space. Args: k: The index of the eigenvalue to return. """
[docs] @abstractmethod def laplacian_eigenvectors_at_point(self, x: Point) -> np.ndarray: """ Returns a list of the values of the eigenvectors at a given point Args: x: The evaluation point. """
[docs] @abstractmethod def random_point(self) -> Any: """Returns a single random point from the underlying symmetric space."""
[docs] @abstractmethod def geodesic_distance(self, p1: Any, p2: Any) -> float: """ Returns the shortest distance along the manifold between two points. Args: p1: The starting point. p2: The end point. """
[docs] @abstractmethod def geodesic_quadrature( self, p1: Any, p2: Any, n_points: int ) -> Tuple[List[Any], np.ndarray]: """ Returns quadrature points and weights for a geodesic between p1 and p2. Returns: points: List of manifold coordinates. weights: Integration weights scaled by the line element. """
[docs] def geodesic_ball_quadrature( self, center: Any, radius: float, n_points: int ) -> Tuple[List[Any], np.ndarray]: r"""Return quadrature points and weights for a geodesic ball. Subclasses may implement geometry-specific quadrature rules. The weights should include the manifold volume element so that ``sum_i weights[i] * f(points[i])`` approximates $\int_{B(center, radius)} f(x)\,dV(x)$. """ raise NotImplementedError( "Geodesic-ball quadrature is not implemented for this symmetric space." )
[docs] def geodesic_ball_integral( self, center: Any, radius: float, /, *, n_points: Optional[int] = None, normalize: bool = False, ) -> LinearForm: r"""Return a linear form for a geodesic-ball integral. The returned form represents $\int_{B(center, radius)} u(x)\,dV(x)$. If ``normalize`` is true, it represents the average over the geodesic ball instead. Subclasses with exact spectral formulas can override this method. Otherwise, a subclass-provided ``geodesic_ball_quadrature`` rule is used. Args: center: Center point of the geodesic ball. radius: Geodesic radius in the manifold's distance units. n_points: Number of quadrature points requested from ``geodesic_ball_quadrature``. normalize: If true, divide by the quadrature measure of the ball. """ if n_points is None: raise NotImplementedError( "Automatic geodesic-ball quadrature density is not available. " "Pass n_points or use a space with an exact implementation." ) points, weights = self.geodesic_ball_quadrature(center, radius, n_points) measure = float(np.sum(weights)) components = np.zeros(self.dim) for point, weight in zip(points, weights): components += weight * np.asarray( self.laplacian_eigenvectors_at_point(point) ) if normalize: if measure <= 0.0: raise ValueError("Cannot normalize a zero-measure geodesic ball.") components /= measure return LinearForm(self, components=components)
[docs] def geodesic_ball_average( self, center: Any, radius: float, /, *, n_points: Optional[int] = None, ) -> LinearForm: r"""Return a linear form for the average over a geodesic ball.""" return self.geodesic_ball_integral( center, radius, n_points=n_points, normalize=True, )
[docs] @abstractmethod def with_degree(self, degree: Degree) -> SymmetricHilbertSpace: """Returns a new instance of the space with a modified truncation degree."""
[docs] @abstractmethod def degree_transfer_operator(self, target_degree: Degree) -> LinearOperator: """Returns the transfer operator from this space to one with a different degree."""
[docs] @abstractmethod def invariant_covariance_function( self, spectral_variances: np.ndarray ) -> Callable[[np.ndarray], np.ndarray]: """ Returns a vectorized function that evaluates the two-point covariance of an invariant measure as a function of geodesic distance. """
[docs] @abstractmethod def degree_multiplicity(self, degree: int) -> int: """Returns the number of eigenfunctions associated with a given degree."""
[docs] @abstractmethod def representative_index(self, degree: int) -> Index: """Returns a valid eigenfunction index for the given degree."""
[docs] @abstractmethod def project_function(self, f: Callable[Point, Value]) -> Vector: """ Returns an element of the space by projecting a given function. Args: f: A function that takes a point and returns a value. """
# ------------------------------------------------------------# # Public methods # # ------------------------------------------------------------#
[docs] def to_dual(self, x: Vector) -> LinearForm: cx = self.to_components(x) if self.orthonormal: return LinearForm(self, components=cx) else: cxp = self._metric @ cx return LinearForm(self, components=cxp)
[docs] def from_dual(self, xp: LinearForm) -> Vector: cxp = xp.components if self.orthonormal: return self.from_components(cxp) else: cx = self._inverse_metric @ cxp return self.from_components(cx)
[docs] def random_points(self, n: int) -> List[Point]: """ Returns a list of `n` random points. Args: n: The number of random points to generate. """ return [self.random_point() for _ in range(n)]
[docs] def invariant_automorphism( self, f: Callable[float, float] ) -> InvariantLinearAutomorphism: """ Returns an automorphism of the form f(Δ) with f a function that is well-defined on the spectrum of the Laplacian, Δ. Args: f: The function """ return InvariantLinearAutomorphism.from_function(self, f)
[docs] def invariant_gaussian_measure( self, f: Callable[float, float], /, *, expectation=None ) -> InvariantGaussianMeasure: """ Returns a Gaussian measure on the space whose covariance takes the form f(Δ) with f a function that is well-defined on the spectrum of the Laplacian, Δ. In order for the covariance to be well-defined, the trace of the covariance operator must be finite. This condition (in the sense of convergence as the size of the approximating space being increased) is not checked. Args: f: The function expectation: The expected value for measure. Defaults to zero Returns: The measure as an instance of InvariantGaussianMeasure """ return InvariantGaussianMeasure.from_function(self, f, expectation=expectation)
[docs] def norm_scaled_invariant_gaussian_measure( self, f: Callable[float, float], /, *, expectation=None, std=1.0 ) -> InvariantGaussianMeasure: """ Returns a Gaussian measure on the space whose covariance takes the form f(Δ) with f a function that is well-defined on the spectrum of the Laplacian, Δ. In order for the covariance to be well-defined, the trace of the covariance operator must be finite. This condition (in the sense of convergence as the size of the approximating space being increased) is not checked. The measure's covariance is scaled such that: E[||x||^2] = std*std + ||E[x]||^2 Args: f: The function expectation: The expected value for measure. Defaults to zero std: The desired standard deviation Returns: The measure as an instance of InvariantGaussianMeasure """ initial_measure = self.invariant_gaussian_measure(f, expectation=expectation) return initial_measure.rescale_norm_variance(std)
[docs] def sobolev_kernel_gaussian_measure( self, order: float, scale: float, /, *, expectation: Vector = None ): """ Returns an invariant Gaussian measure with a Sobolev-type covariance equal to (1 + scale^2 * Δ)^-order. Args: order: Order parameter for the covariance. scale: Scale parameter for the covariance. expectation: The expected value for measure. Defaults to zero """ return self.invariant_gaussian_measure( self.sobolev_kernel(order, scale), expectation=expectation )
[docs] def norm_scaled_sobolev_kernel_gaussian_measure( self, order: float, scale: float, /, *, expectation: Vector = None, std: float = 1, ): """ Returns an invariant Gaussian measure with a Sobolev-type covariance proportional to (1 + scale^2 * Δ)^-order. The measure's covariance is scaled such that the expected value for the samples norm is equal to the given standard deviation. Args: order: Order parameter for the covariance. scale: Scale parameter for the covariance. expectation: The expected value for measure. Defaults to zero std: The desired standard deviation """ return self.norm_scaled_invariant_gaussian_measure( lambda k: (1 + scale**2 * k) ** -order, expectation=expectation, std=std )
[docs] def heat_kernel_gaussian_measure( self, scale: float, /, *, expectation: Vector = None ): """ Returns an invariant Gaussian measure with a heat kernel covariance equal to exp(-scale^2 * Δ). Args: scale: Scale parameter for the covariance. expectation: The expected value for measure. Defaults to zero """ return self.invariant_gaussian_measure( self.heat_kernel(scale), expectation=expectation )
[docs] def norm_scaled_heat_kernel_gaussian_measure( self, scale: float, /, *, expectation: Vector = None, std: float = 1 ): """ Returns an invariant Gaussian measure with a heat kernel covariance equal to exp(-scale^2 * Δ). Args: scale: Scale parameter for the covariance. expectation: The expected value for measure. Defaults to zero std: The desired standard deviation """ return self.norm_scaled_invariant_gaussian_measure( lambda k: np.exp(-(scale**2) * k), expectation=expectation, std=std )
[docs] def cluster_points( self, points: List[Point], /, *, threshold: Optional[float] = None, n_clusters: Optional[int] = None, linkage_method: str = "complete", ) -> List[List[int]]: """ Clusters a list of points into interacting blocks based on their geodesic distance. This is particularly useful for generating the 'interacting_blocks' required by localized preconditioners in Bayesian inversions. Args: points: A list of points on the symmetric manifold. threshold: The maximum geodesic distance between points in a cluster. n_clusters: The exact number of clusters to form (alternative to threshold). linkage_method: The hierarchical clustering method to use ('complete', 'average', 'single', etc.). Defaults to 'complete' to ensure clusters remain geographically compact. Returns: A list of lists, where each sub-list contains the indices of the points belonging to a specific cluster. """ from scipy.cluster.hierarchy import linkage, fcluster n = len(points) if n == 0: return [] if n == 1: return [[0]] if threshold is None and n_clusters is None: raise ValueError("You must specify either 'threshold' or 'n_clusters'.") # 1. Compute pairwise geodesic distances (in SciPy's condensed 1D format) # A condensed matrix is required by the linkage function distances = np.zeros(n * (n - 1) // 2) idx = 0 for i in range(n): for j in range(i + 1, n): distances[idx] = self.geodesic_distance(points[i], points[j]) idx += 1 # 2. Perform agglomerative hierarchical clustering Z = linkage(distances, method=linkage_method) # 3. Extract the flat clusters based on the user's criteria if n_clusters is not None: labels = fcluster(Z, t=n_clusters, criterion="maxclust") else: labels = fcluster(Z, t=threshold, criterion="distance") # 4. Group the point indices by their assigned cluster label blocks = {} for i, label in enumerate(labels): blocks.setdefault(label, []).append(i) return list(blocks.values())
[docs] def pairs_within_distance( self, points: List[Point], max_distance: float ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Finds all pairs of points within a given geodesic distance. Returns arrays of row indices, column indices, and their geodesic distances. """ n = len(points) rows, cols, dists = [], [], [] for i in range(n): for j in range(n): # Full symmetric sweep d = self.geodesic_distance(points[i], points[j]) if d <= max_distance: rows.append(i) cols.append(j) dists.append(d) return np.array(rows), np.array(cols), np.array(dists)
[docs] def estimate_truncation_degree( self, covariance_function: Callable[[float], float], /, *, rtol: float = 1e-6, min_degree: int = 0, max_degree: Optional[int] = None, ) -> int: """ Estimates the truncation degree required to capture the expected energy of a function drawn from a prior measure. """ summation = 0.0 degree = 0 err = 1.0 while err > rtol: # Safety ceiling if max_degree is not None and degree >= max_degree: return max_degree idx = self.representative_index(degree) lambda_val = self.laplacian_eigenvalue(idx) variance = covariance_function(lambda_val) norm_sq = self.laplacian_eigenvector_squared_norm(idx) multiplicity = self.degree_multiplicity(degree) term_energy = variance * multiplicity * norm_sq summation += term_energy if summation > 0: err = term_energy / summation else: err = 1.0 if err <= rtol: break degree += 1 if degree > 100000: raise RuntimeError("Failed to converge on a stable truncation degree.") # Safety floor return max(degree, min_degree)
[docs] def l2_products_operator(self, weighting_functions: List[Any]) -> LinearOperator: """ Creates an operator that computes a vector of L2 inner products. The action on a function 'u' returns a vector 'd' where d_i = <u, w_i>_L2. Args: weighting_functions: A list of vectors representing the weighting kernels. Returns: LinearOperator: A mapping from this space to EuclideanSpace(len(weighting_functions)). """ return LinearOperator.from_vectors(self, weighting_functions)
[docs] class AbstractSymmetricLebesgueSpace(HilbertModule, SymmetricHilbertSpace, ABC): """ A specialisation for scalar-valued L² function spaces on symmetric manifolds. To be instantiated, such a class must provide the following additional methods: vector_multiply vector_sqrt """
[docs] def gradient_dot_product(self, f: Vector, g: Vector) -> Vector: """ Computes the pointwise dot product of the surface gradients of two fields. """ L = self.laplacian fg = self.vector_multiply(f, g) f_Lg = self.vector_multiply(f, L(g)) g_Lf = self.vector_multiply(g, L(f)) result = L(fg) self.axpy(-1.0, f_Lg, result) self.axpy(-1.0, g_Lf, result) self.ax(0.5, result) return result
[docs] def flexural_operator( self, flexural_rigidity: Union[Vector, float, int], poisson_ratio: Union[Vector, float, int], buoyancy_factor: Union[Vector, float, int], ) -> LinearOperator: r""" Constructs the generalized 4th-order variable-coefficient flexure operator for a thin elastic shell on a Riemannian manifold. This operator models the normal deflection of a shell subjected to a transverse load. It accounts for spatially varying material properties without requiring explicit tensor calculus by utilizing Bochner-style identities to express the inner product of Hessians purely in terms of Laplacians and surface gradients. The operator evaluates: Op(w) = L(D * L(w)) - tr(Hess(D_eff) * Hess(w)) + rho_g * w Where: * w is the normal deflection. * D is the flexural rigidity. * D_eff is the effective rigidity: D * (1 - nu). * nu is Poisson's ratio. * rho_g is the foundation restoring force (buoyancy factor). * L() is the Laplace-Beltrami operator. Mathematical Details (The Hessian Calculation): --------------------------------------------- Because the framework is built on the spectral properties of the Laplacian, it does not compute Hessians directly. Instead, it uses a well-known identity for the inner product of two Hessians on a 2D manifold: tr(Hess(A) * Hess(B)) = 0.5 * L(grad(A) . grad(B)) - 0.5 * (grad(L(A)) . grad(B)) - 0.5 * (grad(A) . grad(L(B))) - K * (grad(A) . grad(B)) Here, K is the Gaussian curvature of the manifold. In flat Euclidean space, derivatives commute and K = 0. On a curved manifold (like a sphere), derivatives do not commute. The Gaussian curvature K acts as the precise geometric penalty required to correct for this non-commutativity. The method `gradient_dot_product` is used to evaluate the (grad(A) . grad(B)) terms internally. Args: flexural_rigidity: The flexural rigidity field (D). Can be a spatially varying Vector in this space or a constant scalar. poisson_ratio: Poisson's ratio (nu). Can be a spatially varying Vector or a constant scalar. buoyancy_factor: The density contrast or foundation restoring force (rho * g). Can be a spatially varying Vector or a constant scalar. Returns: LinearOperator: A self-adjoint linear operator mapping a deflection field to its corresponding load field. """ L = self.laplacian K = self.gaussian_curvature if isinstance(flexural_rigidity, (float, int)): D = self.project_function(lambda _: flexural_rigidity) else: D = flexural_rigidity if isinstance(poisson_ratio, (float, int)): nu = self.project_function(lambda _: poisson_ratio) else: nu = poisson_ratio if isinstance(buoyancy_factor, (float, int)): rho_g = self.project_function(lambda _: buoyancy_factor) else: rho_g = buoyancy_factor one_vec = self.project_function(lambda _: 1.0) one_minus_nu = self.subtract(one_vec, nu) D_eff = self.vector_multiply(D, one_minus_nu) delta_D_eff = L(D_eff) def apply_operator(w: Any) -> Any: delta_w = L(w) D_delta_w = self.vector_multiply(D, delta_w) principal_term = L(D_delta_w) grad_Deff_w = self.gradient_dot_product(D_eff, w) poisson_penalty = self.multiply(0.5, L(grad_Deff_w)) term2_base = self.gradient_dot_product(delta_D_eff, w) self.axpy(-0.5, term2_base, poisson_penalty) term3_base = self.gradient_dot_product(D_eff, delta_w) self.axpy(-0.5, term3_base, poisson_penalty) self.axpy(-K, grad_Deff_w, poisson_penalty) result = principal_term self.axpy(-1.0, poisson_penalty, result) if isinstance(rho_g, (float, int)): self.axpy(float(rho_g), w, result) else: restoring_force = self.vector_multiply(rho_g, w) self.axpy(1.0, restoring_force, result) return result return LinearOperator.self_adjoint(self, apply_operator)
[docs] def inverse_flexural_operator( self, flexural_rigidity: Union[Vector, float, int], poisson_ratio: Union[Vector, float, int], buoyancy_factor: Union[Vector, float, int], /, *, baseline_rigidity: Optional[float] = None, baseline_buoyancy: Optional[float] = None, solver: Optional[IterativeLinearSolver] = None, ) -> LinearOperator: r""" Constructs the inverse of the generalized variable-coefficient flexure operator. This method acts as an intelligent factory, mapping a spatial load field to the resulting normal deflection. It utilizes a dual-path execution strategy depending on the spatial variance of the input parameters: 1. **Analytical Fast-Path:** If $D$, $\nu$, and $\rho_g$ are all constant scalars, the Poisson penalty vanishes ($\nabla D_{eff} = 0$), and the operator simplifies exactly to $D \Delta^2 + \rho_g$. The method returns an ultra-fast, exact `InvariantLinearAutomorphism` by reciprocating the spectral eigenvalues. 2. **Iterative Slow-Path:** If any parameter is a spatially varying field, the method wraps a preconditioned Conjugate Gradient (CG) iterative solver inside a standard `LinearOperator` interface. To accelerate the iterative solver, an invariant preconditioner is built using baseline scalar values for rigidity and buoyancy. If these baselines are not explicitly provided, the solver automatically computes the spatial means of the provided fields via $L^2$ projection. Args: flexural_rigidity: The flexural rigidity field $D$ or a constant scalar. poisson_ratio: Poisson's ratio $\nu$ or a constant scalar. buoyancy_factor: The restoring force $\rho_g$ or a constant scalar. baseline_rigidity: An optional scalar guess for the average rigidity, used to construct the spectral preconditioner. baseline_buoyancy: An optional scalar guess for the average buoyancy, used to construct the spectral preconditioner. solver: An optional pre-configured `IterativeLinearSolver` instance (e.g., to inject specific tolerances or progress callbacks). Defaults to a standard `CGSolver`. Returns: LinearOperator: A self-adjoint linear operator mapping a load field to its corresponding deflection field. """ all_scalars = all( isinstance(param, (float, int)) for param in (flexural_rigidity, poisson_ratio, buoyancy_factor) ) if all_scalars: D = float(flexural_rigidity) rho_g = float(buoyancy_factor) return self.invariant_automorphism(lambda k: 1.0 / (D * (k**2) + rho_g)) flexural_operator = self.flexural_operator( flexural_rigidity, poisson_ratio, buoyancy_factor ) def compute_average(f: Union[Vector, float, int]) -> float: if isinstance(f, (float, int)): return float(f) else: one = self.project_function(lambda _: 1.0) num = self.inner_product(f, one) den = self.squared_norm(one) return num / den if baseline_rigidity is not None: D0 = baseline_rigidity else: D0 = compute_average(flexural_rigidity) if baseline_buoyancy is not None: rho_0 = baseline_buoyancy else: rho_0 = compute_average(buoyancy_factor) preconditioner = self.invariant_automorphism( lambda k: 1.0 / (D0 * (k**2) + rho_0) ) _solver = CGSolver() if solver is None else solver if not isinstance(_solver, IterativeLinearSolver): raise ValueError( "The solver prodided must be an instance of IterativeLinearSolver" ) return _solver(flexural_operator, preconditioner=preconditioner)
[docs] class SymmetricSobolevSpace(MassWeightedHilbertModule, AbstractSymmetricLebesgueSpace): """ The Sobolev space Hˢ constructed over a symmetric Lebesgue space. This implementation leverages the mass-weighting framework to ensure that the inner product and dual mappings correctly account for the smoothness order and scale. """ def __init__( self, lebesgue_space: AbstractSymmetricLebesgueSpace, order: float, scale: float, /, *, safe: Optional[bool] = True, ) -> None: """ Args: lebesgue_space: The underlying L² space (which provides Δ eigenvalues). order: The Sobolev smoothness order (s). scale: The Sobolev length-scale (κ). safe: If true, the class checks for mathematical correctness of operations where possible. """ self._order = order self._scale = scale self._safe = safe mass_operator = lebesgue_space.invariant_automorphism(self.sobolev_function) inverse_mass_operator = mass_operator.inverse MassWeightedHilbertModule.__init__( self, lebesgue_space, mass_operator, inverse_mass_operator ) AbstractSymmetricLebesgueSpace.__init__( self, lebesgue_space.spatial_dimension, lebesgue_space.degree, lebesgue_space.dim, False, ) @property def order(self) -> float: """The Sobolev order.""" return self._order @property def scale(self) -> float: """The Sobolev length-scale.""" return self._scale @property def safe(self) -> bool: """ True if mathematical checks are being applied. """ return self._safe
[docs] @abstractmethod def with_order(self, order: float) -> SymmetricSobolevSpace: """ Returns a new instance of the exact same space but with a modified Sobolev order. """
[docs] def order_inclusion_operator(self, target_order: float) -> LinearOperator: """Returns the inclusion operator from this space to one of a lower order.""" if self.safe and target_order > self.order: raise ValueError( "Target order must be less than or equal to the current order." ) codomain = self.with_order(target_order) underlying_identity = self.underlying_space.identity_operator() return LinearOperator.from_formal_adjoint(self, codomain, underlying_identity)
[docs] def sobolev_function(self, lambda_val) -> float: """ Returns the value of the Sobolev function associated with the space. """ return (1.0 + (self.scale**2) * lambda_val) ** self.order
[docs] def dirac(self, point: Point) -> LinearForm: """ Returns the Dirac measure at the chosen point. """ if self.safe and self.order <= self.spatial_dimension / 2: raise NotImplementedError("The Dirac measure is not defined on this space.") cxp = self.laplacian_eigenvectors_at_point(point) return LinearForm(self, components=cxp)
[docs] def dirac_representation(self, point) -> Vector: """ Returns the representation of the Dirac measure at the chosen point. """ return self.from_dual(self.dirac(point))
[docs] def point_evaluation_operator(self, points: List[Any]) -> LinearOperator: """ Returns a linear operator that evaluates a function at a list of points. The resulting operator maps a function (a vector in this space) to a vector in Euclidean space containing the function's values at the specified locations. This is the primary mechanism for creating a forward operator that links a function field to a set of discrete measurements. Args: points: A list of points at which to evaluate the functions. """ if self.safe and self.order <= self.spatial_dimension / 2: raise NotImplementedError("Point evaluation is not defined on this space") dim = len(points) matrix = np.zeros((dim, self.dim)) for i, point in enumerate(points): cp = self.dirac(point).components matrix[i, :] = cp return LinearOperator.from_matrix( self, EuclideanSpace(dim), matrix, galerkin=True )
[docs] def point_value_scaled_invariant_gaussian_measure( self, f: Callable[[float], float], /, *, expectation: Vector = None, std: float = 1, ): """ Returns an invariant Gaussian measure with covariance proportional to f(Δ), where f must be such that this operator is trace-class. The covariance of the operator is scaled such that the standard deviation of the point-wise values are equal to the given std. Args: f: A real-valued function that is well-defined on the spectrum of the Laplacian, Δ. std: The desired standard deviation for the pointwise values. Raises: NotImplementedError: If the Sobolev order is less than n/2, with n the spatial dimension. Notes: This method applies for symmetric spaces an invariant measures. As a result, the pointwise variance is the same at all points. Internally, a random point is chosen to carry out the normalisation. """ unscaled_measure = InvariantGaussianMeasure.from_function( self, f, expectation=expectation ) return unscaled_measure.rescale_directional_variance( self.dirac_representation(self.random_point()), std )
[docs] def point_value_scaled_sobolev_kernel_gaussian_measure( self, order: float, scale: float, /, *, expectation: Vector = None, std: float = 1, ): """ Returns an invariant Gaussian measure with a Sobolev-type covariance proportional to (1 + scale^2 * Δ)^-order. The covariance of the operator is scaled such that the standard deviation of the point-wise values are equal to the given amplitude. Args: order: Order parameter for the covariance. scale: Scale parameter for the covariance. std: The desired standard deviation for the pointwise values. """ return self.point_value_scaled_invariant_gaussian_measure( lambda k: (1 + scale**2 * k) ** -order, expectation=expectation, std=std )
[docs] def point_value_scaled_heat_kernel_gaussian_measure( self, scale: float, /, *, expectation: Vector = None, std: float = 1 ): """ Returns an invariant Gaussian measure with a heat-kernel covariance proportional to exp(-scale^2 * Δ). The covariance of the operator is scaled such that the standard deviation of the point-wise values are equal to the given amplitude. Args: scale: Scale parameter for the covariance. std: The desired standard deviation for the pointwise values. """ return self.point_value_scaled_invariant_gaussian_measure( lambda k: np.exp(-(scale**2) * k), expectation=expectation, std=std )
[docs] def geodesic_integral( self, p1: Point, p2: Point, /, *, n_points: Optional[int] = None ) -> LinearForm: """ Returns a linear functional representing the line integral of a function along a geodesic path. This method approximates the integral :math:`\\int_{\\gamma} u(s) ds`, where :math:`\\gamma` is the shortest path (geodesic) connecting points `p1` and `p2`. The integral is represented as a :class:`LinearForm` in the dual space, constructed by summing weighted point evaluations (Dirac measures) along the path. For Hilbert spaces with a specified :attr:`scale`, the method can automatically determine the required quadrature density to resolve the smooth features of the space's sensitivity kernels. Args: p1: The starting point of the geodesic. The type is manifold-dependent (e.g., float for :class:`Circle`, tuple for :class:`Sphere`). p2: The end point of the geodesic. n_points (int, optional): The number of Gauss-Legendre quadrature points. If None, it is heuristically determined as: :math:`n = \\lceil (\\text{arc\\_length} / \\text{scale}) \\times 2 \\rceil`. This ensures at least two points per characteristic length-scale, providing stable sampling of the sensitivity kernel. Defaults to None. Returns: LinearForm: A linear functional whose action on a vector `u` computes the approximated line integral. Raises: NotImplementedError: If the Sobolev order :math:`s` is less than or equal to half the spatial dimension :math:`n/2`. """ if self.safe and self.order <= self.spatial_dimension / 2: raise NotImplementedError( f"Order {self.order} is too low for point evaluation on a " f"{self.spatial_dimension}D manifold." ) if n_points is None: _, temp_weights = self.geodesic_quadrature(p1, p2, n_points=2) arc_length = np.sum(temp_weights) n_points = int(np.ceil((arc_length / self.scale) * 2.0)) n_points = max(2, n_points) points, weights = self.geodesic_quadrature(p1, p2, n_points) total_components = np.zeros(self.dim) for pt, weight in zip(points, weights): total_components += weight * self.dirac(pt).components return LinearForm(self, components=total_components)
[docs] def geodesic_integral_representation( self, p1: Point, p2: Point, /, *, n_points: Optional[int] = None ) -> Vector: """ Returns the Riesz representation (sensitivity kernel) of the line integral. This maps the LinearForm (the integral functional) back into the primal Hilbert space. Visualizing this vector reveals the "sensitivity" of the line integral to perturbations at different locations in the domain. Args: p1, p2: Start and end points of the geodesic. n_points: Number of quadrature points. """ integral_form = self.geodesic_integral(p1, p2, n_points=n_points) return self.from_dual(integral_form)
[docs] def path_average_operator(self, paths, /, *, n_points=None): """ Constructs a tomographic operator mapping a function field to its line integrals along a set of geodesic paths. Note: Despite the name, this operator returns the line integral (the dual pairing of the function with the path functional) rather than a normalized average, unless the user manually scales the forms. This corresponds to the 'path average' convention often used in seismic and atmospheric tomography. Args: paths (List[Tuple[Any, Any]]): A list of start and end point pairs defining the geodesics. n_points (int, optional): The number of quadrature points per path. If None, the heuristic based on the Sobolev scale is used. Returns: LinearOperator: An operator mapping Space -> EuclideanSpace(len(paths)). The adjoint of this operator performs the 'back-projection' mapping data residuals into the function space. """ path_forms = [ self.geodesic_integral(p1, p2, n_points=n_points) for p1, p2 in paths ] return LinearOperator.from_linear_forms(path_forms)
[docs] def random_source_receiver_paths( self, n_sources: int, n_receivers: int ) -> List[Tuple[Any, Any]]: """ Generates a list of source-receiver pairs by connecting every source to every receiver. This method uses the existing :meth:`random_points` logic to generate coordinates appropriate for the specific symmetric space. For a set of S sources and R receivers, this returns a list of S*R paths. Args: n_sources: The number of random source locations to generate. n_receivers: The number of random receiver locations to generate. Returns: List[Tuple[Any, Any]]: A list of tuples, where each tuple contains a (source, receiver) pair. """ # Generate the points using the existing base class method sources = self.random_points(n_sources) receivers = self.random_points(n_receivers) # Create the full-mesh network paths = [] for src in sources: for rec in receivers: paths.append((src, rec)) return paths
[docs] def cluster_points( self, points: List[Point], /, *, threshold: Optional[float] = None, n_clusters: Optional[int] = None, linkage_method: str = "complete", ) -> List[List[int]]: """ Clusters a list of points into interacting blocks based on their geodesic distance. This is particularly useful for generating the 'interacting_blocks' required by localized preconditioners in Bayesian inversions. Args: points: A list of points on the symmetric manifold. threshold: The maximum geodesic distance between points in a cluster. n_clusters: The exact number of clusters to form (alternative to threshold). linkage_method: The hierarchical clustering method to use ('complete', 'average', 'single', etc.). Defaults to 'complete' to ensure clusters remain geographically compact. Returns: A list of lists, where each sub-list contains the indices of the points belonging to a specific cluster. """ return self.underlying_space.cluster_points( points, threshold=threshold, n_clusters=n_clusters, linkage_method=linkage_method, )
[docs] def distance_localized_preconditioner( self, prior_measure: InvariantGaussianMeasure, points: List[Point], max_distance: float, /, *, data_error_measure: Optional[GaussianMeasure] = None, apply_taper: bool = False, parallel: bool = False, n_jobs: int = -1, ) -> LinearOperator: """ Builds a highly specialized, ultra-fast sparse preconditioner for point evaluation problems when the prior measure is invariant. This exploits the property that the two-point covariance depends solely on the geodesic distance. It maps out the 1D covariance function along a deterministic path, interpolates it, optionally applies a Gaspari-Cohn taper to ensure positive-definiteness, and populates a sparse normal matrix. If max_distance is set to 0.0, it bypasses the distance calculations and instantly builds a purely diagonal (Jacobi) preconditioner. Args: prior_measure: The invariant prior Gaussian measure. points: The list of observation points. max_distance: The geodesic distance beyond which covariance is assumed to be zero (enforces sparsity). Set to 0.0 for a pure diagonal. data_error_measure: The Gaussian measure describing the data noise. Can be None if the noise is already folded into the prior_measure. apply_taper: If true, applies Gaspari-Cohn taper. parallel: If True, computes the error measure diagonal in parallel. n_jobs: Number of CPU cores to use if parallel=True. Returns: A LinearOperator representing the inverse of the approximated sparse normal matrix. """ n_data = len(points) data_space = EuclideanSpace(n_data) # ======================================================= # SPECIAL CASE: Purely Diagonal (Jacobi) Preconditioner # ======================================================= if max_distance <= 0.0: p1 = self.random_point() rep1 = self.dirac_representation(p1) q_rep1 = prior_measure.covariance(rep1) prior_var = float(self.inner_product(rep1, q_rep1)) if data_error_measure is not None: noise_variance = data_error_measure.covariance.extract_diagonal( galerkin=True, parallel=parallel, n_jobs=n_jobs ) inv_diag = 1.0 / (prior_var + noise_variance) else: inv_diag = 1.0 / prior_var def apply_diag_preconditioner(x): c_vec = data_space.to_components(x) return data_space.from_components(c_vec * inv_diag) return LinearOperator( data_space, data_space, apply_diag_preconditioner, adjoint_mapping=apply_diag_preconditioner, ) # ======================================================= # STANDARD CASE: Distance-Localized Sparse Preconditioner # ======================================================= def gaspari_cohn_vectorized(d_array, c_val): """Vectorized Gaspari-Cohn taper for lightning-fast array evaluations.""" z = d_array / c_val taper = np.zeros_like(z) mask1 = z <= 1.0 z1 = z[mask1] taper[mask1] = ( 1.0 - (5.0 / 3.0) * z1**2 + (5.0 / 8.0) * z1**3 + (1.0 / 2.0) * z1**4 - (1.0 / 4.0) * z1**5 ) mask2 = (z > 1.0) & (z <= 2.0) z2 = z[mask2] taper[mask2] = ( 4.0 - 5.0 * z2 + (5.0 / 3.0) * z2**2 + (5.0 / 8.0) * z2**3 - (1.0 / 2.0) * z2**4 + (1.0 / 12.0) * z2**5 - (2.0 / 3.0) / z2 ) return taper row_indices, col_indices, dists = self.pairs_within_distance( points, max_distance ) # Get the exact, vectorized covariance function from the manifold cov_evaluator = self.invariant_covariance_function( prior_measure.spectral_variances ) raw_vals = cov_evaluator(dists) # The Gaspari-Cohn taper remains unchanged c_taper = max_distance / 2.0 if apply_taper: tapers = gaspari_cohn_vectorized(dists, c_taper) values = raw_vals * tapers else: values = raw_vals H_sparse = sps.coo_matrix( (values, (row_indices, col_indices)), shape=(n_data, n_data) ) if data_error_measure is not None: noise_variance = data_error_measure.covariance.extract_diagonal( galerkin=True, parallel=parallel, n_jobs=n_jobs ) R_sparse = sps.diags(noise_variance) H_approx = (H_sparse + R_sparse).tocsc() else: H_approx = H_sparse.tocsc() splu_solver = splinalg.splu(H_approx) def apply_sparse_preconditioner(x): c_vec = data_space.to_components(x) c_solved = splu_solver.solve(c_vec) return data_space.from_components(c_solved) return LinearOperator( data_space, data_space, apply_sparse_preconditioner, adjoint_mapping=apply_sparse_preconditioner, )
[docs] def degree_transfer_operator(self, target_degree: Degree) -> LinearOperator: codomain = self.with_degree(target_degree) lebesgue_inclusion = self.underlying_space.degree_transfer_operator( target_degree ) return LinearOperator.from_formal_adjoint(self, codomain, lebesgue_inclusion)
[docs] def l2_products_operator(self, weighting_functions: List[Any]) -> LinearOperator: """ Creates an operator that computes a vector of L2 inner products. The action on a function 'u' returns a vector 'd' where d_i = <u, w_i>_L2. Args: weighting_functions: A list of vectors representing the weighting kernels. Returns: LinearOperator: A mapping from this space to EuclideanSpace(len(weighting_functions)). """ l2_operator = self.underlying_space.l2_products_operator(weighting_functions) return LinearOperator.from_formal_adjoint( self, l2_operator.codomain, l2_operator )
[docs] def flexural_operator( self, flexural_rigidity: Union[Vector, float, int], poisson_ratio: Union[Vector, float, int], buoyancy_factor: Union[Vector, float, int], ) -> LinearOperator: r""" Constructs the generalized 4th-order variable-coefficient flexure operator for a thin elastic shell on a Riemannian manifold. This operator models the normal deflection of a shell subjected to a transverse load. It accounts for spatially varying material properties without requiring explicit tensor calculus by utilizing Bochner-style identities to express the inner product of Hessians purely in terms of Laplacians and surface gradients. Mathematically, the operator evaluates: $$ Op(w) = \Delta(D \Delta w) - \text{tr}(\text{Hess}(D(1-\nu)) \text{Hess}(w)) + \rho_g w $$ Where: * $w$ is the normal deflection. * $D$ is the flexural rigidity. * $\nu$ is Poisson's ratio. * $\rho_g$ is the foundation restoring force (buoyancy factor). * $K$ is the Gaussian curvature of the underlying manifold (handled internally). Args: flexural_rigidity: The flexural rigidity field $D$. Can be a spatially varying Vector in this space or a constant scalar. poisson_ratio: Poisson's ratio $\nu$. Can be a spatially varying Vector or a constant scalar. buoyancy_factor: The density contrast or foundation restoring force $\rho_g$. Can be a spatially varying Vector or a constant scalar. Returns: LinearOperator: A self-adjoint linear operator mapping a deflection field to its corresponding load field. """ l2_operator = self.underlying_space.flexural_operator( flexural_rigidity, poisson_ratio, buoyancy_factor ) return LinearOperator.from_formally_self_adjoint(self, l2_operator)
[docs] def inverse_flexural_operator( self, flexural_rigidity: Union[Vector, float, int], poisson_ratio: Union[Vector, float, int], buoyancy_factor: Union[Vector, float, int], /, *, baseline_rigidity: Optional[float] = None, baseline_buoyancy: Optional[float] = None, solver: Optional[IterativeLinearSolver] = None, ) -> LinearOperator: r""" Constructs the inverse of the generalized variable-coefficient flexure operator. This method acts as an intelligent factory, mapping a spatial load field to the resulting normal deflection. It utilizes a dual-path execution strategy depending on the spatial variance of the input parameters: 1. **Analytical Fast-Path:** If $D$, $\nu$, and $\rho_g$ are all constant scalars, the Poisson penalty vanishes ($\nabla D_{eff} = 0$), and the operator simplifies exactly to $D \Delta^2 + \rho_g$. The method returns an ultra-fast, exact `InvariantLinearAutomorphism` by reciprocating the spectral eigenvalues. 2. **Iterative Slow-Path:** If any parameter is a spatially varying field, the method wraps a preconditioned Conjugate Gradient (CG) iterative solver inside a standard `LinearOperator` interface. To accelerate the iterative solver, an invariant preconditioner is built using baseline scalar values for rigidity and buoyancy. If these baselines are not explicitly provided, the solver automatically computes the spatial means of the provided fields via $L^2$ projection. Args: flexural_rigidity: The flexural rigidity field $D$ or a constant scalar. poisson_ratio: Poisson's ratio $\nu$ or a constant scalar. buoyancy_factor: The restoring force $\rho_g$ or a constant scalar. baseline_rigidity: An optional scalar guess for the average rigidity, used to construct the spectral preconditioner. baseline_buoyancy: An optional scalar guess for the average buoyancy, used to construct the spectral preconditioner. solver: An optional pre-configured `IterativeLinearSolver` instance (e.g., to inject specific tolerances or progress callbacks). Defaults to a standard `CGSolver`. Returns: LinearOperator: A self-adjoint linear operator mapping a load field to its corresponding deflection field. """ l2_operator = self.underlying_space.inverse_flexural_operator( flexural_rigidity, poisson_ratio, buoyancy_factor, baseline_rigidity=baseline_rigidity, baseline_buoyancy=baseline_buoyancy, solver=solver, ) return LinearOperator.from_formal_adjoint( self, l2_operator.codomain, l2_operator )
# ------------------------------------------------------- # # Methods defered to the Lebesgue space # # ------------------------------------------------------- # @property def gaussian_curvature(self) -> float: return self.underlying_space.gaussian_curvature
[docs] def index_to_integer(self, k: Index) -> int: return self.underlying_space.index_to_integer(k)
[docs] def integer_to_index(self, i: int) -> Index: return self.underlying_space.integer_to_index(i)
[docs] def laplacian_eigenvalue(self, k: Index) -> float: return self.underlying_space.laplacian_eigenvalue(k)
[docs] def laplacian_eigenvector_squared_norm(self, k: Index) -> float: l2_norm_sq = self.underlying_space.laplacian_eigenvector_squared_norm(k) lambda_k = self.laplacian_eigenvalue(k) weight = self.sobolev_function(lambda_k) return l2_norm_sq * weight
[docs] def laplacian_eigenvectors_at_point(self, x: Point) -> List[Value]: return self.underlying_space.laplacian_eigenvectors_at_point(x)
[docs] def random_point(self) -> Point: return self.underlying_space.random_point()
[docs] def geodesic_distance(self, p1: Point, p2: Point) -> float: return self.underlying_space.geodesic_distance(p1, p2)
[docs] def geodesic_quadrature( self, p1: Any, p2: Any, n_points: int ) -> Tuple[List[Any], np.ndarray]: return self.underlying_space.geodesic_quadrature(p1, p2, n_points)
[docs] def geodesic_ball_quadrature( self, center: Any, radius: float, n_points: int ) -> Tuple[List[Any], np.ndarray]: return self.underlying_space.geodesic_ball_quadrature(center, radius, n_points)
[docs] def geodesic_ball_integral( self, center: Any, radius: float, /, *, n_points: Optional[int] = None, normalize: bool = False, ) -> LinearForm: try: underlying_form = self.underlying_space.geodesic_ball_integral( center, radius, n_points=n_points, normalize=normalize, ) except NotImplementedError: return super().geodesic_ball_integral( center, radius, n_points=n_points, normalize=normalize, ) return LinearForm(self, components=underlying_form.components.copy())
[docs] def pairs_within_distance( self, points: List[Point], max_distance: float ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: return self.underlying_space.pairs_within_distance(points, max_distance)
[docs] def invariant_covariance_function( self, spectral_variances: np.ndarray ) -> Callable[[np.ndarray], np.ndarray]: return self.underlying_space.invariant_covariance_function(spectral_variances)
[docs] def degree_multiplicity(self, degree: int) -> int: return self.underlying_space.degree_multiplicity(degree)
[docs] def representative_index(self, degree: int) -> Index: return self.underlying_space.representative_index(degree)
def __eq__(self, other: object) -> bool: """ Checks for mathematical equality with another Sobolev spaces. """ if not isinstance(other, SymmetricSobolevSpace): return NotImplemented check1 = self.underlying_space == other.underlying_space check2 = self.order == other.order check3 = self.scale == other.scale return check1 and check2 and check3