Source code for pygeoinf.linear_bayesian

"""
Implements the Bayesian framework for solving linear inverse problems.

This module treats the inverse problem from a statistical perspective. Rather
than seeking a single deterministic "best-fit" solution, it aims to determine
the full posterior probability distribution of the unknown model parameters
given the observed data, prior knowledge, and noise statistics.

A core feature of this module is its dual algebraic formalism, allowing users to
optimize computational efficiency based on the problem geometry:

- **data_space**: Assembles the data-space normal operator (size M x M, where M is
  the data dimension).
  Normal Operator: `N = A Q A* + R`
  Kalman Gain:     `K = Q A* N^-1`
  Best suited for underdetermined problems (M << N).

- **model_space**: Assembles the model-space normal operator (size N x N, where N is
  the model dimension).
  Normal Operator: `N = Q^-1 + A* R^-1 A`
  Kalman Gain:     `K = N^-1 A* R^-1`
  Best suited for overdetermined problems (N << M).

Key Classes
-----------
- `LinearBayesianInversion`: Computes the posterior Gaussian measure `p(u|d)`
  for the model `u` given observed data `d`.
"""

from __future__ import annotations

from typing import Optional, List, Literal, Union

from joblib import Parallel, delayed
import numpy as np

import scipy.sparse as sps
import scipy.sparse.linalg as splinalg

from .inversion import LinearInversion
from .gaussian_measure import GaussianMeasure
from .forward_problem import LinearForwardProblem
from .linear_operators import (
    LinearOperator,
    DiagonalSparseMatrixLinearOperator,
)
from .linear_solvers import (
    LinearSolver,
    IterativeLinearSolver,
    ResidualTrackingCallback,
)
from .hilbert_space import Vector, EuclideanSpace, HilbertSpace
from .affine_operators import AffineOperator
from .low_rank import LowRankSVD, LowRankEig
from .functional_calculus import operator_function_quadratic_form


[docs] class LinearBayesianInversion(LinearInversion): """ Solves a linear inverse problem using Bayesian methods. This class applies to problems of the form `d = A(u) + e`, where `u` is a Gaussian random variable representing the model prior, and `e` is a Gaussian random variable representing observation noise. It computes the exact posterior Gaussian measure `p(u|d)`, providing access to the posterior expectation, the posterior covariance operator, and an efficient exact-sampling mechanism using the randomize-then-optimize technique. """ # ========================================================================= # Initialization & Core Properties # ========================================================================= def __init__( self, forward_problem: LinearForwardProblem, model_prior_measure: GaussianMeasure, /, *, formalism: Literal["model_space", "data_space"] = "data_space", ) -> None: """ Initializes the linear Bayesian inversion problem. Args: forward_problem: The forward problem linking the model to the data, containing the forward operator `A` and data error measure `R`. model_prior_measure: The prior Gaussian measure `Q` on the model space. formalism: The algebraic space in which the normal equations are assembled and solved. Must be 'model_space' or 'data_space'. Defaults to 'data_space'. Raises: ValueError: If an invalid formalism string is provided, or if the 'model_space' formalism is selected but the necessary inverse covariance operators (precision operators) are not set. """ super().__init__(forward_problem, formalism=formalism) self._model_prior_measure: GaussianMeasure = model_prior_measure if self.formalism == "model_space": if not self.model_prior_measure.inverse_covariance_set: raise ValueError( "Prior inverse covariance must be set for model_space formalism." ) if ( self.forward_problem.data_error_measure_set and not self.forward_problem.data_error_measure.inverse_covariance_set ): raise ValueError( "Data error inverse covariance must be set for model_space formalism." )
[docs] def with_formalism( self, formalism: Literal["model_space", "data_space"] ) -> LinearBayesianInversion: """ Returns a new instance of the Bayesian inversion using the specified formalism. Args: formalism: The algebraic space in which the normal equations should be assembled and solved. Must be 'model_space' or 'data_space'. Returns: A new LinearBayesianInversion instance sharing the exact same forward problem and prior measure, but configured to use the specified formalism. """ return type(self)( self.forward_problem, self.model_prior_measure, formalism=formalism )
@property def model_prior_measure(self) -> GaussianMeasure: """The prior Gaussian measure on the model space.""" return self._model_prior_measure @property def data_prior_measure(self) -> GaussianMeasure: """ The prior predictive distribution on the data space. This represents the expected distribution of data before observation. """ return self.data_measure_from_model_measure(self.model_prior_measure) @property def joint_prior_measure(self) -> GaussianMeasure: """ The joint prior distribution of both the model and the data. """ return self.joint_measure(self.model_prior_measure) # ========================================================================= # Core Algebra & Normal Equations # ========================================================================= @property def normal_operator(self) -> LinearOperator: """ Constructs the Bayesian Normal operator for the chosen formalism. For 'data_space': Returns `N = A Q A* + R` For 'model_space': Returns `N = Q^-1 + A* R^-1 A` Returns: A LinearOperator representing the normal equations matrix. """ forward_operator = self.forward_problem.forward_operator if self.formalism == "data_space": model_prior_covariance = self.model_prior_measure.covariance if self.forward_problem.data_error_measure_set: return ( forward_operator @ model_prior_covariance @ forward_operator.adjoint + self.forward_problem.data_error_measure.covariance ) else: return ( forward_operator @ model_prior_covariance @ forward_operator.adjoint ) else: # model_space prior_inv_cov = self.model_prior_measure.inverse_covariance if self.forward_problem.data_error_measure_set: data_inv_cov = ( self.forward_problem.data_error_measure.inverse_covariance ) return ( prior_inv_cov + forward_operator.adjoint @ data_inv_cov @ forward_operator ) else: return prior_inv_cov + forward_operator.adjoint @ forward_operator def _compute_data_residual(self, data: Vector) -> Vector: """ Computes the exact data residual shifted by the prior and noise expectations. v = d - A(mu_u) - mu_e """ data_space = self.data_space A = self.forward_problem.forward_operator v_data = data_space.copy(data) if not self.model_prior_measure.has_zero_expectation: v_data = data_space.subtract( v_data, A(self.model_prior_measure.expectation) ) if ( self.forward_problem.data_error_measure_set and not self.forward_problem.data_error_measure.has_zero_expectation ): v_data = data_space.subtract( v_data, self.forward_problem.data_error_measure.expectation ) return v_data
[docs] def get_normal_equations_rhs(self, data: Vector) -> Vector: """ Computes the exact right-hand side vector (v) of the normal equations N * w = v for a given observed data vector, automatically accounting for non-zero prior and noise expectations. """ forward_operator = self.forward_problem.forward_operator shifted_data = self._compute_data_residual(data) if self.formalism == "data_space": # In data space, the RHS is exactly the shifted data residuals return shifted_data else: # In model space, the RHS is projected back via A* R^-1 if self.forward_problem.data_error_measure_set: data_inv_cov = ( self.forward_problem.data_error_measure.inverse_covariance ) return forward_operator.adjoint(data_inv_cov(shifted_data)) else: return forward_operator.adjoint(shifted_data)
[docs] def kalman_operator( self, solver: LinearSolver, /, *, preconditioner: Optional[LinearOperator] = None, ) -> LinearOperator: """ Constructs the Kalman gain operator `K`. The Kalman gain maps data residuals to model space updates. For 'data_space': `K = Q A* (A Q A* + R)^-1` For 'model_space': `K = (Q^-1 + A* R^-1 A)^-1 A* R^-1` Args: solver: The LinearSolver used to invert the normal operator. preconditioner: Optional preconditioner for iterative solvers. Returns: A LinearOperator representing the Kalman gain. """ forward_operator = self.forward_problem.forward_operator normal_operator = self.normal_operator if isinstance(solver, IterativeLinearSolver): inverse_normal_operator = solver( normal_operator, preconditioner=preconditioner ) else: inverse_normal_operator = solver(normal_operator) if self.formalism == "data_space": model_prior_covariance = self.model_prior_measure.covariance return ( model_prior_covariance
[docs] @ forward_operator.adjoint @ inverse_normal_operator ) else: # model_space if self.forward_problem.data_error_measure_set: data_inv_cov = ( self.forward_problem.data_error_measure.inverse_covariance ) return inverse_normal_operator @ forward_operator.adjoint @ data_inv_cov else: return inverse_normal_operator @ forward_operator.adjoint
# ========================================================================= # Posterior Extraction # ========================================================================= def model_posterior_measure( self, data: Vector, solver: LinearSolver, /, *, preconditioner: Optional[LinearOperator] = None, ) -> GaussianMeasure: """ Computes and returns the posterior Gaussian measure `p(u|d)`. This method applies the Kalman update equations to find the posterior expectation and covariance. If both the prior and data error measures have sampling enabled, it automatically constructs a randomize-then-optimize exact sampling function for the posterior. Args: data: The observed data vector. solver: A linear solver for inverting the normal operator. preconditioner: An optional preconditioner for iterative solvers. Returns: A GaussianMeasure representing the posterior distribution. """ model_space = self.model_space data_space = self.data_space forward_operator = self.forward_problem.forward_operator model_prior_covariance = self.model_prior_measure.covariance # 1. Resolve Inverse Normal Operator & Kalman Gain normal_operator = self.normal_operator if isinstance(solver, IterativeLinearSolver): inverse_normal_operator = solver( normal_operator, preconditioner=preconditioner ) else: inverse_normal_operator = solver(normal_operator) if self.formalism == "data_space": kalman_gain = ( model_prior_covariance @ forward_operator.adjoint @ inverse_normal_operator ) # C_post = C_u - K A C_u covariance = model_prior_covariance - ( kalman_gain @ forward_operator @ model_prior_covariance ) else: # model_space if self.forward_problem.data_error_measure_set: data_inv_cov = ( self.forward_problem.data_error_measure.inverse_covariance ) kalman_gain = ( inverse_normal_operator @ forward_operator.adjoint @ data_inv_cov ) else: kalman_gain = inverse_normal_operator @ forward_operator.adjoint # Optimization: In model space, the inverted normal operator IS the posterior covariance covariance = inverse_normal_operator # 2. Compute Posterior Mean shifted_data = self._compute_data_residual(data) mean_update = kalman_gain(shifted_data) if self.model_prior_measure.has_zero_expectation: expectation = mean_update else: expectation = model_space.add( self.model_prior_measure.expectation, mean_update ) # 3. Set up Posterior Sampling (Randomize-then-Optimize) can_sample_prior = self.model_prior_measure.sample_set can_sample_noise = ( not self.forward_problem.data_error_measure_set or self.forward_problem.data_error_measure.sample_set ) if can_sample_prior and can_sample_noise: def sample(): # a. Sample Prior (u) model_sample = self.model_prior_measure.sample() # b. Calculate deterministic residual (v - Bu) prediction = forward_operator(model_sample) data_residual = data_space.subtract(data, prediction) # c. Subtract full noise sample (v - Bu - z) if self.forward_problem.data_error_measure_set: noise_sample = self.forward_problem.data_error_measure.sample() data_residual = data_space.subtract(data_residual, noise_sample) # d. Update with Kalman gain (u + K * residual) correction = kalman_gain(data_residual) return model_space.add(model_sample, correction) return GaussianMeasure( covariance=covariance, expectation=expectation, sample=sample ) else: return GaussianMeasure(covariance=covariance, expectation=expectation)
[docs] def posterior_expectation_operator( self, solver: LinearSolver, /, *, preconditioner: Optional[LinearOperator] = None, ) -> Union[LinearOperator, AffineOperator]: """ Constructs the operator mapping observed data to the posterior expectation. The mapping evaluates F(d) = mu_u + K(d - A(mu_u) - mu_e). If the prior and data error measures both have a zero expectation, this mapping is purely linear and returns the Kalman gain operator directly. Otherwise, it regroups the terms into an AffineOperator: F(d) = K(d) + (mu_u - K(A(mu_u) + mu_e)). Args: solver: The LinearSolver used to invert the normal operator. preconditioner: Optional preconditioner for iterative solvers. Returns: A LinearOperator (if expectations are zero) or an AffineOperator. """ kalman_gain = self.kalman_operator(solver, preconditioner=preconditioner) zero_prior_mean = self.model_prior_measure.has_zero_expectation zero_error_mean = ( not self.forward_problem.data_error_measure_set or self.forward_problem.data_error_measure.has_zero_expectation ) # Strictly linear case if zero_prior_mean and zero_error_mean: return kalman_gain data_space = self.data_space model_space = self.model_space A = self.forward_problem.forward_operator # 1. Compute the baseline shift in data space: A(mu_u) + mu_e if zero_prior_mean and not zero_error_mean: data_shift = self.forward_problem.data_error_measure.expectation elif not zero_prior_mean and zero_error_mean: data_shift = A(self.model_prior_measure.expectation) else: data_shift = data_space.add( A(self.model_prior_measure.expectation), self.forward_problem.data_error_measure.expectation, ) # 2. Compute the translation vector in model space: mu_u - K(data_shift) k_shift = kalman_gain(data_shift) if zero_prior_mean: translation = model_space.negative(k_shift) else: translation = model_space.subtract( self.model_prior_measure.expectation, k_shift ) return AffineOperator(linear_part=kalman_gain, translation=translation)
[docs] def normal_residual_callback( self, data: Vector, /, *, message: str = "CG Iteration: {iter} | Normal Residual: {res:.3e}", print_progress: bool = True, ) -> ResidualTrackingCallback: """ Generates a ResidualTrackingCallback pre-configured to track the convergence of the Bayesian normal equations for the given data vector. Args: data: The observed data vector. message: The formatting string for printing progress. print_progress: If True, prints the message to stdout at each iteration. Returns: A configured ResidualTrackingCallback ready to be passed to a solver. """ rhs = self.get_normal_equations_rhs(data) return ResidualTrackingCallback( operator=self.normal_operator, y=rhs, print_progress=print_progress, message=message, )
# ========================================================================= # Evidence & Diagnostics # =========================================================================
[docs] def mahalanobis_evidence_term( self, data: Vector, solver: LinearSolver, /, *, preconditioner: Optional[LinearOperator] = None, ) -> float: """ Computes the data-dependent Mahalanobis term of the log-evidence. This value represents the optimal, penalty-balanced data misfit. It is equivalent to the unnormalized log-posterior evaluated at the posterior expectation. Mathematically, for a shifted data residual vector v = d - A(mu_u) - mu_e, the term evaluates the quadratic form: Misfit = <v, (A Q A* + R)^-1 v> In the 'model_space' formalism, this is computed far more efficiently using the Woodbury matrix identity to bypass the massive data-space inversion: Misfit = <v, R^-1 v> - <A* R^-1 v, (Q^-1 + A* R^-1 A)^-1 A* R^-1 v> Args: data: The observed data vector 'd'. solver: The LinearSolver used to invert the normal operator. preconditioner: An optional preconditioner to accelerate iterative solvers. Returns: float: The scalar Mahalanobis distance. Raises: ValueError: If the forward problem lacks a defined data error measure. """ if not self.forward_problem.data_error_measure_set: raise ValueError("Data error measure must be set.") data_space = self.data_space model_space = self.model_space R_measure = self.forward_problem.data_error_measure # 1. Compute the raw shifted data residual v_data = self._compute_data_residual(data) # 2. Solve the normal equations normal_op = self.normal_operator rhs = self.get_normal_equations_rhs(data) if isinstance(solver, IterativeLinearSolver): w = solver.solve_linear_system(normal_op, preconditioner, rhs, None) else: w = solver(normal_op)(rhs) # 3. Extract the Mahalanobis distance based on formalism if self.formalism == "data_space": # Misfit = <v_data, (A Q A* + R)^-1 v_data> mahalanobis_term = data_space.inner_product(v_data, w) else: # Woodbury optimization for model_space. R_inv = R_measure.inverse_covariance misfit_base = data_space.inner_product(v_data, R_inv(v_data)) misfit_reduction = model_space.inner_product(rhs, w) mahalanobis_term = misfit_base - misfit_reduction return float(mahalanobis_term)
def _trace_log_slq( self, op: LinearOperator, space: HilbertSpace, size_estimate: int, method: Literal["variable", "fixed"], max_samples: Optional[int], rtol: float, block_size: int, lanczos_degree: int, lanczos_rtol: Optional[float], parallel: bool, n_jobs: int, ) -> float: """ Internal matrix-free engine for estimating Tr(ln(A)) using Stochastic Lanczos Quadrature (SLQ). This relies on the identity ln(|A|) = Tr(ln(A)). It combines a Hutchinson stochastic trace estimator (outer loop) with the Lanczos process (inner loop) to evaluate the matrix logarithm quadratic forms <z, ln(A) z> without ever assembling A. Args: op: The self-adjoint, positive-definite operator to evaluate. space: The Hilbert space on which the operator acts. size_estimate: Initial number of Hutchinson probe vectors (z). method: 'variable' to sample until 'rtol' is met; 'fixed' otherwise. max_samples: Hard limit on Hutchinson probe vectors. rtol: Relative tolerance for the outer Hutchinson trace estimator. block_size: Number of probes to evaluate between convergence checks. lanczos_degree: The maximum Krylov subspace dimension (k) per probe. lanczos_rtol: Relative tolerance for early stopping in the Lanczos loop. If None, forces exactly 'lanczos_degree' iterations. parallel: If True, evaluates the independent probe vectors concurrently. n_jobs: Number of CPU cores to utilize. Returns: float: The estimated trace of the matrix logarithm. """ def log_func(x: np.ndarray) -> np.ndarray: return np.log(np.maximum(x, 1e-15)) if hasattr(space, "squared_norms"): inv_norms = 1.0 / np.sqrt(space.squared_norms) else: inv_norms = np.array( [ 1.0 / np.sqrt(space.squared_norm(space.basis_vector(i))) for i in range(space.dim) ] ) if max_samples is None: max_samples = space.dim num_samples = min(size_estimate, max_samples) def _compute_sample_sum(n_samples_to_draw: int) -> float: def _single_sample() -> float: c_z = np.random.choice([-1.0, 1.0], size=space.dim) * inv_norms z = space.from_components(c_z) return operator_function_quadratic_form( op, z, log_func, size_estimate=lanczos_degree, method="fixed" if lanczos_rtol is None else "variable", rtol=lanczos_rtol if lanczos_rtol is not None else 1e-3, ) if parallel: results = Parallel(n_jobs=n_jobs)( delayed(_single_sample)() for _ in range(n_samples_to_draw) ) return sum(results) else: return sum(_single_sample() for _ in range(n_samples_to_draw)) total_sum = _compute_sample_sum(num_samples) trace_est = total_sum / num_samples if num_samples > 0 else 0.0 if method == "variable" and num_samples < max_samples: while num_samples < max_samples: old_est = trace_est samples_to_add = min(block_size, max_samples - num_samples) new_sum = _compute_sample_sum(samples_to_add) total_sum += new_sum total_samples = num_samples + samples_to_add trace_est = total_sum / total_samples if abs(trace_est) > 0: error = abs(trace_est - old_est) / abs(trace_est) if error < rtol: break num_samples = total_samples return float(trace_est)
[docs] def estimate_log_determinant( self, /, *, operator_type: Literal["data_space", "model_space"] = "data_space", size_estimate: int = 10, method: Literal["variable", "fixed"] = "variable", max_samples: Optional[int] = None, rtol: float = 1e-2, block_size: int = 5, lanczos_degree: int = 40, lanczos_rtol: Optional[float] = 1e-3, parallel: bool = False, n_jobs: int = -1, ) -> float: """ Estimates the log-determinant of the Bayesian normal operator using Stochastic Lanczos Quadrature (SLQ). This acts as a public interface for computing the log-determinant of either the data-space normal operator (A Q A* + R) or the model-space normal operator (Q^-1 + A* R^-1 A). It securely resolves the correct algebraic space and delegates to the internal matrix-free SLQ engine. Args: operator_type: The target normal operator ('data_space' or 'model_space'). size_estimate: Initial number of Hutchinson samples (probe vectors). method: 'variable' to sample until 'rtol' is met, 'fixed' otherwise. max_samples: Hard limit on the number of Hutchinson samples. rtol: Relative tolerance for the Hutchinson trace estimate. block_size: Number of new samples per iteration in the 'variable' method. lanczos_degree: Maximum Krylov dimension (k) per probe vector. lanczos_rtol: Relative tolerance for dynamic Lanczos truncation. If None, uses fixed 'lanczos_degree' steps. parallel: If True, evaluates probe vectors in parallel. n_jobs: Number of CPU cores to use if parallel=True. Returns: float: The estimated log-determinant ln(|N|). """ surrogate = self.with_formalism(operator_type) space = ( surrogate.model_space if operator_type == "model_space" else surrogate.data_space ) op = surrogate.normal_operator return self._trace_log_slq( op, space, size_estimate=size_estimate, method=method, max_samples=max_samples, rtol=rtol, block_size=block_size, lanczos_degree=lanczos_degree, lanczos_rtol=lanczos_rtol, parallel=parallel, n_jobs=n_jobs, )
[docs] def log_evidence( self, data: Vector, solver: LinearSolver, /, *, preconditioner: Optional[LinearOperator] = None, size_estimate: int = 10, method: Literal["variable", "fixed"] = "variable", max_samples: Optional[int] = None, rtol: float = 1e-2, block_size: int = 5, lanczos_degree: int = 40, lanczos_rtol: Optional[float] = 1e-3, parallel: bool = False, n_jobs: int = -1, ) -> float: """ Computes the approximate log marginal likelihood (evidence) of the data, ln(p(d)). The log-evidence is a critical metric for Bayesian model selection, allowing users to quantitatively compare different prior assumptions or forward models. It evaluates the likelihood of the observed data marginalized over all possible model states. The computation evaluates the Gaussian marginal density equation: ln p(d) = -0.5 * [ ln|N_d| + <v, N_d^-1 v> + M * ln(2*pi) ] Args: data: The observed data vector 'd'. solver: The LinearSolver used to invert the normal operator for the Mahalanobis misfit term. preconditioner: Optional preconditioner for the iterative solver. size_estimate: Initial number of SLQ Hutchinson samples. method: 'variable' to dynamically bound SLQ error, 'fixed' otherwise. max_samples: Hard limit on SLQ Hutchinson samples. rtol: Relative tolerance for the SLQ trace estimate. block_size: Number of new SLQ samples per iteration. lanczos_degree: Maximum Krylov dimension for SLQ. lanczos_rtol: Relative tolerance for dynamic Lanczos truncation. parallel: If True, evaluates SLQ probe vectors in parallel. n_jobs: Number of CPU cores to use if parallel=True. Returns: float: The estimated log-evidence ln(p(d)). Raises: ValueError: If the 'model_space' formalism is used but no data error measure has been set on the forward problem. """ mahalanobis = self.mahalanobis_evidence_term( data, solver, preconditioner=preconditioner ) slq_kwargs = { "size_estimate": size_estimate, "method": method, "max_samples": max_samples, "rtol": rtol, "block_size": block_size, "lanczos_degree": lanczos_degree, "lanczos_rtol": lanczos_rtol, "parallel": parallel, "n_jobs": n_jobs, } if self.formalism == "data_space": log_det = self.estimate_log_determinant( operator_type="data_space", **slq_kwargs ) else: # By Sylvester's determinant identity: # ln|N_d| = ln|N_m| + ln|Q| + ln|R| log_det_Nm = self.estimate_log_determinant( operator_type="model_space", **slq_kwargs ) log_det_Q = self._trace_log_slq( self.model_prior_measure.covariance, self.model_space, **slq_kwargs ) if not self.forward_problem.data_error_measure_set: raise ValueError( "Data error measure is required to compute log evidence." ) log_det_R = self._trace_log_slq( self.forward_problem.data_error_measure.covariance, self.data_space, **slq_kwargs, ) log_det = log_det_Nm + log_det_Q + log_det_R m = self.data_space.dim constant = m * np.log(2 * np.pi) return -0.5 * (log_det + mahalanobis + constant)
# ========================================================================= # Preconditioners # =========================================================================
[docs] def diagonal_normal_preconditioner( self, /, *, blocks: Optional[List[List[int]]] = None, parallel: bool = False, n_jobs: int = -1, ) -> LinearOperator: """ Constructs a diagonal preconditioner specifically for the data-space Bayesian normal operator `(A Q A* + R)`. This exploits the identity `<v, A Q A* v> = <A* v, Q A* v>`. If blocks of data indices are provided, it acts on the averaged basis vector for each block to compute a robust representative regional variance, requiring only one adjoint action of the forward operator per block. Args: blocks: An optional list of lists, where each sub-list contains indices of data points grouped together. Must perfectly partition the data space. parallel: If True, computes the adjoint actions in parallel. n_jobs: Number of parallel jobs to use. -1 means all available cores. Returns: A DiagonalSparseMatrixLinearOperator representing the inverse of the approximated normal operator. Raises: ValueError: If the inversion was initialized with `formalism='model_space'`, as this preconditioner is mathematically invalid for that normal operator. """ if self.formalism != "data_space": raise ValueError( "This custom preconditioner is mathematically derived for the " "data-space normal operator (A Q A* + R) and cannot be used " "with the model-space formalism." ) data_space = self.data_space model_space = self.model_space A_adj = self.forward_problem.forward_operator.adjoint Q = self.model_prior_measure.covariance data_dim = data_space.dim if blocks is not None: flattened_indices = [idx for block in blocks for idx in block] if ( len(flattened_indices) != data_dim or len(set(flattened_indices)) != data_dim ): raise ValueError( f"The provided blocks must exactly partition the data space. " f"Expected {data_dim} unique indices, but got {len(flattened_indices)} " f"total indices with {len(set(flattened_indices))} unique." ) if min(flattened_indices) < 0 or max(flattened_indices) >= data_dim: raise ValueError("Block indices are out of bounds for the data space.") blocks_to_compute = blocks else: blocks_to_compute = [[i] for i in range(data_dim)] def compute_block_aqa_diag(block: List[int]) -> float: n = len(block) c = np.zeros(data_dim) c[block] = 1.0 / n v = data_space.from_components(c) f = A_adj(v) Q_f = Q(f) return model_space.inner_product(f, Q_f) if parallel: computed_vals = Parallel(n_jobs=n_jobs)( delayed(compute_block_aqa_diag)(block) for block in blocks_to_compute ) else: computed_vals = [ compute_block_aqa_diag(block) for block in blocks_to_compute ] aqa_diag = np.zeros(data_dim) for block, val in zip(blocks_to_compute, computed_vals): aqa_diag[block] = val if self.forward_problem.data_error_measure_set: r_diag = ( self.forward_problem.data_error_measure.covariance.extract_diagonal( galerkin=True, parallel=parallel, n_jobs=n_jobs ) ) normal_diag = aqa_diag + r_diag else: normal_diag = aqa_diag approx_normal_op = DiagonalSparseMatrixLinearOperator.from_diagonal_values( data_space, data_space, normal_diag, galerkin=True ) return approx_normal_op.inverse
[docs] def sparse_localized_preconditioner( self, interacting_blocks: list[list[int]], rank: int = 10, parallel: bool = False, n_jobs: int = -1, ) -> LinearOperator: """ Builds a sparse preconditioner specifically for the data-space Bayesian normal equations using randomized Nystrom approximations on localized, potentially overlapping sub-blocks. Args: interacting_blocks: A list of lists, where each sub-list contains the indices of data points that strongly couple to each other. rank: The rank of the randomized Nystrom approximation to use per block. parallel: If True, computes the sub-block approximations in parallel. n_jobs: Number of CPU cores to use if parallel=True (-1 uses all cores). Returns: A LinearOperator representing the inverse of the sparse approximation. Raises: ValueError: If the inversion was initialized with `formalism='model_space'`, as this preconditioner is mathematically invalid for that normal operator. """ if self.formalism != "data_space": raise ValueError( "This custom preconditioner is mathematically derived for the " "data-space normal operator (A Q A* + R) and cannot be used " "with the model-space formalism." ) forward_op = self.forward_problem.forward_operator prior_cov = self.model_prior_measure.covariance data_space = forward_op.codomain noise_variance = ( self.forward_problem.data_error_measure.covariance.extract_diagonal( parallel=parallel, n_jobs=n_jobs, galerkin=True ) if self.forward_problem.data_error_measure_set else np.zeros(self.data_space.dim) ) core_normal_op = forward_op @ prior_cov @ forward_op.adjoint data_dim = data_space.dim euclidean_full = EuclideanSpace(data_dim) to_components_op = data_space.coordinate_projection print( f"Building sparse preconditioner over {len(interacting_blocks)} blocks (Rank {rank})..." ) def _process_block(indices: list[int]): block_size = len(indices) idx_array = np.array(indices) restrict_coords = euclidean_full.subspace_projection(indices) P = restrict_coords @ to_components_op P_star = P.adjoint H_local = P @ core_normal_op @ P_star actual_rank = min(rank, block_size) eig_approx = LowRankEig.from_randomized( H_local, actual_rank, method="fixed" ) V = eig_approx.u_factor.matrix(dense=True, galerkin=True) Lambda = eig_approx.eigenvalues M_local_array = (V * Lambda) @ V.T I_local, J_local = np.meshgrid( np.arange(block_size), np.arange(block_size), indexing="ij" ) I_global = idx_array[I_local.flatten()] J_global = idx_array[J_local.flatten()] V_flattened = M_local_array.flatten() return I_global, J_global, V_flattened row_indices, col_indices, values = [], [], [] if parallel: results = Parallel(n_jobs=n_jobs)( delayed(_process_block)(indices) for indices in interacting_blocks ) for I_glob, J_glob, V_flat in results: row_indices.extend(I_glob) col_indices.extend(J_glob) values.extend(V_flat) else: for indices in interacting_blocks: I_glob, J_glob, V_flat = _process_block(indices) row_indices.extend(I_glob) col_indices.extend(J_glob) values.extend(V_flat) H_sparse = sps.coo_matrix( (values, (row_indices, col_indices)), shape=(data_dim, data_dim) ) R_sparse = sps.diags(noise_variance) H_approx = (H_sparse + R_sparse).tocsc() print("Factorizing sparse matrix...") splu_solver = splinalg.splu(H_approx) def apply_preconditioner(x): c = data_space.to_components(x) c_solved = splu_solver.solve(c) return data_space.from_components(c_solved) return LinearOperator( data_space, data_space, apply_preconditioner, adjoint_mapping=apply_preconditioner, )
[docs] def woodbury_data_preconditioner( self, solver: LinearSolver, /, *, prior_solver: Optional[LinearSolver] = None, noise_solver: Optional[LinearSolver] = None, ) -> LinearOperator: """ Constructs a data-space preconditioner using the Woodbury matrix identity. Data Space Normal Operator: N_d = A Q A* + R Woodbury Identity: N_d^-1 = R^-1 - R^-1 A (Q^-1 + A* R^-1 A)^-1 A* R^-1 Note: This method assumes the prior (Q) and noise (R) measures either already have well-defined inverse covariances, or are well-conditioned enough to be inverted by the provided solvers. If your prior is an unbounded operator on a function space, use `Q.with_regularized_inverse()` before passing it to the inversion. Args: solver: The LinearSolver used to invert the inner Woodbury operator (N_m). prior_solver: An optional solver used to explicitly invert the prior covariance (Q) if its inverse is not already set. Defaults to `solver` if not provided. noise_solver: An optional solver used to explicitly invert the data error covariance (R) if its inverse is not already set. Defaults to `solver` if not provided. Returns: A LinearOperator representing the Woodbury-approximated inverse. """ A = self.forward_problem.forward_operator Q = self.model_prior_measure if not self.forward_problem.data_error_measure_set: raise ValueError( "Data error measure must be set for the Woodbury identity." ) R = self.forward_problem.data_error_measure # 1. Invert the noise covariance R if R.inverse_covariance_set: R_inv = R.inverse_covariance else: r_solver = noise_solver if noise_solver is not None else solver R_inv = r_solver(R.covariance) # 2. Invert the prior covariance Q if Q.inverse_covariance_set: Q_inv = Q.inverse_covariance else: q_solver = prior_solver if prior_solver is not None else solver Q_inv = q_solver(Q.covariance) # 3. Assemble and invert the inner Woodbury operator (N_m) N_m = Q_inv + A.adjoint @ R_inv @ A N_m_inv = solver(N_m) # 4. Return the assembled preconditioner return R_inv - R_inv @ A @ N_m_inv @ A.adjoint @ R_inv
[docs] def woodbury_model_preconditioner( self, solver: LinearSolver, /, ) -> LinearOperator: """ Constructs a model-space preconditioner using the Woodbury matrix identity. Model Space Normal Operator: N_m = Q^-1 + A* R^-1 A Woodbury Identity: N_m^-1 = Q - Q A* (R + A Q A*)^-1 A Q Note: Unlike the data-space Woodbury identity, this formulation does not require evaluating the explicit inverses of the prior (Q) or noise (R) covariances, making it highly robust for unbounded or complex measures. Args: solver: The LinearSolver used to invert the inner Woodbury operator (N_d). Returns: A LinearOperator representing the Woodbury-approximated inverse. """ A = self.forward_problem.forward_operator Q = self.model_prior_measure.covariance if not self.forward_problem.data_error_measure_set: raise ValueError( "Data error measure must be set for the Woodbury identity." ) R = self.forward_problem.data_error_measure.covariance # 1. Assemble the inner data-space operator (N_d) N_d = R + A @ Q @ A.adjoint # 2. Invert the inner operator N_d_inv = solver(N_d) # 3. Assemble and return the preconditioner return Q - Q @ A.adjoint @ N_d_inv @ A @ Q
# ========================================================================= # Surrogates & Parameterization # =========================================================================
[docs] def surrogate_inversion( self, /, *, alternate_forward_operator: Optional[LinearOperator] = None, alternate_prior_measure: Optional[GaussianMeasure] = None, alternate_data_error_measure: Optional[GaussianMeasure] = None, ) -> LinearBayesianInversion: """ Constructs a surrogate Bayesian inversion problem using simplified physics, priors, or data errors. This is primarily used to construct robust, computationally cheap surrogate models to use as preconditioners for the full, complex inverse problem. Args: alternate_forward_operator: An optional simplified forward operator. alternate_prior_measure: An optional simplified prior measure. alternate_data_error_measure: An optional simplified data error measure. Returns: A new LinearBayesianInversion instance representing the surrogate problem. The surrogate inherits the `formalism` of the parent problem. Raises: ValueError: If the alternative operators/measures exist in incompatible domains/codomains. """ A_tilde = alternate_forward_operator or self.forward_problem.forward_operator Q_tilde = alternate_prior_measure or self.model_prior_measure if alternate_data_error_measure is not None: R_tilde = alternate_data_error_measure elif self.forward_problem.data_error_measure_set: R_tilde = self.forward_problem.data_error_measure else: R_tilde = None if A_tilde.domain != Q_tilde.domain: raise ValueError( "The domain of the alternate forward operator must match " "the domain of the prior measure." ) if A_tilde.codomain != self.data_space: raise ValueError( "The data space for the alternate forward operator must " "match that for the original problem" ) if R_tilde.domain != self.data_space: raise ValueError( "The domain for the alternate error measure must " "match that for the original problem" ) surrogate_forward_problem = LinearForwardProblem( A_tilde, data_error_measure=R_tilde ) # Inherit the formalism of the parent inversion return LinearBayesianInversion( surrogate_forward_problem, Q_tilde, formalism=self.formalism )
[docs] def surrogate_normal_preconditioner( self, solver: LinearSolver, /, *, alternate_forward_operator: Optional[LinearOperator] = None, alternate_prior_measure: Optional[GaussianMeasure] = None, alternate_data_error_measure: Optional[GaussianMeasure] = None, ) -> LinearOperator: """ Builds a preconditioner by exactly inverting the normal operator of a simplified surrogate inverse problem. Args: solver: The LinearSolver to use to exactly invert the surrogate normal operator. alternate_forward_operator: An optional simplified forward operator. alternate_prior_measure: An optional simplified prior measure. alternate_data_error_measure: An optional simplified data error measure. Returns: A LinearOperator representing the inverse of the surrogate normal equations. """ surrogate_inv = self.surrogate_inversion( alternate_forward_operator=alternate_forward_operator, alternate_prior_measure=alternate_prior_measure, alternate_data_error_measure=alternate_data_error_measure, ) return solver(surrogate_inv.normal_operator)
[docs] def surrogate_woodbury_data_preconditioner( self, solver: LinearSolver, /, *, alternate_forward_operator: Optional[LinearOperator] = None, alternate_prior_measure: Optional[GaussianMeasure] = None, alternate_data_error_measure: Optional[GaussianMeasure] = None, prior_solver: Optional[LinearSolver] = None, noise_solver: Optional[LinearSolver] = None, ) -> LinearOperator: """ Builds a data-space preconditioner by applying the Woodbury matrix identity to a simplified surrogate inverse problem. This method chains the construction of the surrogate model with the extraction of the Woodbury inverse in one step. Note: Ensure that any alternate measures provided have well-defined inverse covariances, or use `.with_regularized_inverse()` on them before passing them to this method. Args: solver: The LinearSolver used to invert the inner Woodbury operator. alternate_forward_operator: An optional simplified forward operator. alternate_prior_measure: An optional simplified prior measure. alternate_data_error_measure: An optional simplified data error measure. prior_solver: Optional solver for the prior covariance. noise_solver: Optional solver for the noise covariance. Returns: A LinearOperator representing the Woodbury-approximated inverse. """ surrogate_inv = self.surrogate_inversion( alternate_forward_operator=alternate_forward_operator, alternate_prior_measure=alternate_prior_measure, alternate_data_error_measure=alternate_data_error_measure, ) return surrogate_inv.woodbury_data_preconditioner( solver, prior_solver=prior_solver, noise_solver=noise_solver )
[docs] def surrogate_woodbury_model_preconditioner( self, solver: LinearSolver, /, *, alternate_forward_operator: Optional[LinearOperator] = None, alternate_prior_measure: Optional[GaussianMeasure] = None, alternate_data_error_measure: Optional[GaussianMeasure] = None, ) -> LinearOperator: """ Builds a model-space preconditioner by applying the Woodbury matrix identity to a simplified surrogate inverse problem. Args: solver: The LinearSolver used to invert the inner Woodbury operator. alternate_forward_operator: An optional simplified forward operator. alternate_prior_measure: An optional simplified prior measure. alternate_data_error_measure: An optional simplified data error measure. Returns: A LinearOperator representing the Woodbury-approximated inverse. """ surrogate_inv = self.surrogate_inversion( alternate_forward_operator=alternate_forward_operator, alternate_prior_measure=alternate_prior_measure, alternate_data_error_measure=alternate_data_error_measure, ) return surrogate_inv.woodbury_model_preconditioner(solver)
[docs] def low_rank_surrogate( self, /, *, forward_rank: Optional[int] = None, prior_rank: Optional[int] = None, data_error_rank: Optional[int] = None, forward_kwargs: Optional[dict] = None, prior_kwargs: Optional[dict] = None, data_error_kwargs: Optional[dict] = None, ) -> LinearBayesianInversion: """ Constructs a surrogate Bayesian inversion problem by replacing the exact physics and statistical measures with their low-rank approximations. This method generates computationally cheap surrogate models to be used in constructing preconditioners for massive, ill-conditioned inverse problems (e.g., using spectral or banded methods). The low-rank approximations are computed using randomized SVD and eigendecomposition algorithms. Args: forward_rank: Target rank for the randomized SVD of the forward operator. prior_rank: Target rank for the randomized eigendecomposition of the prior. data_error_rank: Target rank for the randomized eigendecomposition of the noise. forward_kwargs: Additional kwargs passed directly to `LinearOperator.random_svd`. prior_kwargs: Additional kwargs passed directly to `GaussianMeasure.low_rank_approximation`. data_error_kwargs: Additional kwargs passed directly to `GaussianMeasure.low_rank_approximation`. Returns: A LinearBayesianInversion representing the low-rank surrogate problem. """ A_tilde = None Q_tilde = None R_tilde = None if forward_rank is not None: f_kwargs = forward_kwargs or {} original_A = self.forward_problem.forward_operator A_tilde = LowRankSVD.from_randomized(original_A, forward_rank, **f_kwargs) if prior_rank is not None: p_kwargs = prior_kwargs or {} Q_tilde = self.model_prior_measure.low_rank_approximation( prior_rank, **p_kwargs ) if data_error_rank is not None: if not self.forward_problem.data_error_measure_set: raise ValueError("Cannot approximate data error measure: none is set.") d_kwargs = data_error_kwargs or {} R_tilde = self.forward_problem.data_error_measure.low_rank_approximation( data_error_rank, **d_kwargs ) return self.surrogate_inversion( alternate_forward_operator=A_tilde, alternate_prior_measure=Q_tilde, alternate_data_error_measure=R_tilde, )
[docs] def parameterized_inversion( self, parameterization: LinearOperator, /, *, parameter_prior: Optional[GaussianMeasure] = None, dense: bool = False, parallel: bool = False, n_jobs: int = -1, formalism: Optional[Literal["model_space", "data_space"]] = None, ) -> LinearBayesianInversion: """ Constructs a parameterized surrogate of the Bayesian inversion. If the target formalism resolves to 'model_space' (which is typical for parameterized inversions), the parameter prior's covariance matrix will be automatically densified to explicitly compute the required precision (inverse covariance) operator. Args: parameterization: A LinearOperator mapping from the parameter space to the full model space. parameter_prior: An optional prior measure on the parameter space. If not provided, the original model prior is pulled back to the parameter space automatically. dense: If True, computes and stores operators as dense matrices. parallel: If True, computes the dense matrices in parallel. n_jobs: Number of CPU cores to use. -1 means all available. formalism: An optional override for the formalism of the new inversion. If None, inherits the formalism of the parent inversion. Returns: A new LinearBayesianInversion instance operating on the parameter space. """ target_formalism = formalism or self.formalism if parameter_prior is None: parameter_prior = self.model_prior_measure.affine_mapping( operator=parameterization.adjoint ) if parameter_prior.domain != parameterization.domain: raise ValueError( "The domain of the parameter prior must match the domain " "of the parameterization operator." ) new_fp = self.forward_problem.parameterized_problem( parameterization, dense=dense, parallel=parallel, n_jobs=n_jobs ) new_prior = parameter_prior if dense or target_formalism == "model_space": new_prior = new_prior.with_dense_covariance( parallel=parallel, n_jobs=n_jobs ) return type(self)(new_fp, new_prior, formalism=target_formalism)
[docs] def data_reduced_inversion( self, reduction_operator: LinearOperator, /, *, reduced_data_error_measure: Optional[GaussianMeasure] = None, dense: bool = False, parallel: bool = False, n_jobs: int = -1, formalism: Optional[Literal["model_space", "data_space"]] = None, ) -> LinearBayesianInversion: """ Constructs a surrogate of the Bayesian inversion using a reduced data space. Args: reduction_operator: A LinearOperator mapping from the current data space to the new, reduced data space. reduced_data_error_measure: An optional data error measure defined on the reduced data space. dense: If True, computes and stores operators as dense matrices. parallel: If True, computes the dense matrices in parallel. n_jobs: Number of CPU cores to use. -1 means all available. formalism: An optional override for the formalism of the new inversion. Returns: A new LinearBayesianInversion instance operating on the reduced data space. """ target_formalism = formalism or self.formalism new_fp = self.forward_problem.data_reduced_problem( reduction_operator, reduced_data_error_measure=reduced_data_error_measure, dense=dense, parallel=parallel, n_jobs=n_jobs, ) new_prior = self.model_prior_measure if dense or target_formalism == "model_space": new_prior = new_prior.with_dense_covariance( parallel=parallel, n_jobs=n_jobs ) return type(self)(new_fp, new_prior, formalism=target_formalism)