Source code for pygeoinf.convex_optimisation

"""
Convex optimisation utilities for non-smooth problems.

This module provides a minimal subgradient descent implementation suitable
for learning and experimentation. It assumes the objective is a NonLinearForm
that can provide a subgradient oracle via form.subgradient(x).
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional, List, Tuple, TYPE_CHECKING, runtime_checkable, Protocol

import numpy as np
from scipy.optimize import minimize

from .nonlinear_forms import NonLinearForm
from .convex_analysis import BallSupportFunction, EllipsoidSupportFunction

if TYPE_CHECKING:
    from .hilbert_space import HilbertSpace, Vector
    from .convex_analysis import SupportFunction
    from .linear_operators import LinearOperator


[docs] @dataclass class SubgradientResult: """Result from subgradient descent optimisation. Attributes: x_best: Best point found (lowest function value). f_best: Best function value found. x_final: Final iterate (may differ from x_best). f_final: Final function value. num_iterations: Number of iterations performed. converged: Whether convergence criterion was met. function_values: History of function values at each iteration. iterates: Optional history of all iterates (memory intensive). """ x_best: "Vector" f_best: float x_final: "Vector" f_final: float num_iterations: int converged: bool function_values: List[float] iterates: Optional[List["Vector"]] = None
[docs] class SubgradientDescent: """ Basic subgradient descent for minimising non-smooth convex functions. Algorithm: x_{k+1} = x_k - α * g_k where g_k ∈ ∂f(x_k) is a subgradient (obtained via oracle.subgradient(x_k)). This implementation uses CONSTANT step size α for all k. Convergence is not guaranteed with constant step size; use for learning/testing only. Parameters: oracle: A NonLinearForm with subgradient() method returning subgradients. step_size: Constant step size α > 0. max_iterations: Maximum number of iterations. store_iterates: Whether to store full history (memory intensive). stagnation_window: Optional number of iterations without improvement to declare convergence. """ def __init__( self, oracle: NonLinearForm, /, *, step_size: float, max_iterations: int = 500, store_iterates: bool = False, stagnation_window: Optional[int] = None, ) -> None: if not isinstance(oracle, NonLinearForm): raise ValueError("oracle must be a NonLinearForm") if step_size <= 0: raise ValueError("step_size must be positive") if max_iterations <= 0: raise ValueError("max_iterations must be positive") if stagnation_window is not None and stagnation_window <= 0: raise ValueError("stagnation_window must be positive if provided") self._oracle = oracle self._step_size = float(step_size) self._max_iterations = int(max_iterations) self._store_iterates = bool(store_iterates) self._stagnation_window = stagnation_window @property def oracle(self) -> NonLinearForm: return self._oracle @property def domain(self) -> "HilbertSpace": return self._oracle.domain
[docs] def solve(self, x0: "Vector") -> SubgradientResult: """Run subgradient descent from initial point x0.""" if not self.domain.is_element(x0): raise ValueError("x0 must be an element of the oracle domain") if not self._oracle.has_subgradient: raise ValueError("oracle must provide a subgradient") x = x0 f_best = float("inf") x_best = x0 function_values: List[float] = [] iterates: Optional[List["Vector"]] = [] if self._store_iterates else None no_improve = 0 converged = False for _ in range(self._max_iterations): f_x = self._oracle(x) function_values.append(float(f_x)) if f_x < f_best: f_best = float(f_x) x_best = x no_improve = 0 else: no_improve += 1 if self._stagnation_window is not None: if no_improve >= self._stagnation_window: converged = True break if iterates is not None: iterates.append(x) g = self._oracle.subgradient(x) step = self.domain.multiply(self._step_size, g) x = self.domain.subtract(x, step) f_final = float(self._oracle(x)) return SubgradientResult( x_best=x_best, f_best=f_best, x_final=x, f_final=f_final, num_iterations=len(function_values), converged=converged, function_values=function_values, iterates=iterates, )
# ============================================================================= # Phase 1: Bundle method core data structures # =============================================================================
[docs] @dataclass class Cut: """A linearisation cut for a convex function at a point. A cut records the function value and a subgradient at an evaluation point, defining the affine lower bound: f_j + <g_j, lambda - x_j> <= f(lambda) for all lambda. Attributes: x: Evaluation point (a Hilbert-space vector). f: Function value f(x). g: Subgradient g in partial_f(x). iteration: Iteration index at which this cut was generated. """ x: "Vector" f: float g: "Vector" iteration: int
[docs] class Bundle: """Collection of cutting-plane linearisations of a convex function. A bundle stores a list of :class:`Cut` objects and provides utilities for building the piecewise-linear epigraph model: hat_phi(lambda) = max_j [ f_j + <g_j, lambda - x_j> ] used by proximal and level bundle methods. Examples: >>> space = EuclideanSpace(3) >>> bundle = Bundle() >>> bundle.add_cut(Cut(x=np.zeros(3), f=1.0, g=np.ones(3), iteration=0)) >>> len(bundle) 1 """ def __init__(self) -> None: self._cuts: List[Cut] = []
[docs] def add_cut(self, cut: Cut) -> None: """Append *cut* to the bundle. Args: cut: The :class:`Cut` to add. """ self._cuts.append(cut)
def __len__(self) -> int: """Return the number of cuts currently stored.""" return len(self._cuts)
[docs] def lower_bound(self) -> float: """Return a placeholder lower bound (-infinity). The true lower bound min_lambda hat_phi(lambda) is computed as the master QP/LP objective value by the outer bundle solver (Phase 2–3). This placeholder allows :class:`BundleResult` to report ``f_low`` before any master problem has been solved. Returns: ``-np.inf`` """ return -np.inf
[docs] def upper_bound(self) -> float: """Return the best (lowest) function value seen so far. This is a valid upper bound on the optimum since f* <= f(x_j) for all recorded points x_j. Returns: min_j f_j. Raises: ValueError: If the bundle is empty. """ if not self._cuts: raise ValueError("Bundle is empty; no upper bound available.") return min(cut.f for cut in self._cuts)
[docs] def best_point(self) -> "Vector": """Return the evaluation point with the lowest recorded function value. Returns: The vector x_j achieving min_j f_j. Raises: ValueError: If the bundle is empty. """ if not self._cuts: raise ValueError("Bundle is empty; no best point available.") return min(self._cuts, key=lambda c: c.f).x
[docs] def linearization_matrix( self, stability_center: "Vector", domain: "HilbertSpace", ) -> Tuple[np.ndarray, np.ndarray]: """Build the inequality constraint matrix for the epigraph QP. For each cut (x_j, f_j, g_j) the cutting-plane constraint is: f_j + <g_j, lambda - x_j> <= t which is equivalent to: g_j^T @ lambda - t <= g_j^T @ x_j - f_j Stacking all cuts gives the system A_ineq @ [lambda; t] <= b where row i is [g_i^T, -1] and b_i = g_i^T @ x_i - f_i. Args: stability_center: Current stability/proximal centre (not used to build the constraint data, but kept for API consistency with Phase 2). domain: The Hilbert space whose ``to_components`` converts vectors to flat numpy arrays. Returns: A tuple ``(A_ineq, b_ineq)`` where * ``A_ineq`` has shape ``(n_cuts, dim + 1)`` — columns are [lambda_1, ..., lambda_d, t]. * ``b_ineq`` has shape ``(n_cuts,)``. Raises: ValueError: If the bundle is empty. """ if not self._cuts: raise ValueError("Bundle is empty; cannot build linearization matrix.") d = domain.dim n = len(self._cuts) A = np.empty((n, d + 1)) b = np.empty(n) for i, cut in enumerate(self._cuts): g_c = domain.to_components(cut.g) x_c = domain.to_components(cut.x) A[i, :d] = g_c A[i, d] = -1.0 b[i] = float(np.dot(g_c, x_c)) - cut.f return A, b
[docs] def compress(self, max_size: int) -> None: """Discard all but the *max_size* most recent cuts. Args: max_size: Maximum number of cuts to retain. If the bundle already has fewer cuts than *max_size*, nothing is changed. """ if len(self._cuts) > max_size: self._cuts = self._cuts[-max_size:]
# --------------------------------------------------------------------------- # QP abstraction # ---------------------------------------------------------------------------
[docs] @dataclass class QPResult: """Result from a quadratic programme solve. Attributes: x: Solution vector (component array). obj: Objective value at ``x``. status: ``'solved'`` on success; a descriptive failure message otherwise. """ x: np.ndarray obj: float status: str
[docs] @runtime_checkable class QPSolver(Protocol): """Protocol for QP solvers used by bundle methods. Solvers must implement the OSQP standard form: min_x (1/2) x^T @ P @ x + q^T @ x subject to: l <= A @ x <= u Args: P: Symmetric positive-semi-definite Hessian, shape ``(n, n)``. q: Linear cost, shape ``(n,)``. A: Constraint matrix, shape ``(m, n)``. l: Lower bounds, shape ``(m,)``; use ``-np.inf`` for one-sided. u: Upper bounds, shape ``(m,)``; use ``+np.inf`` for one-sided. x0: Optional warm-start primal solution. Returns: A :class:`QPResult`. """
[docs] def solve(
self, P: np.ndarray, q: np.ndarray, A: np.ndarray, l: np.ndarray, u: np.ndarray, x0: Optional[np.ndarray] = None, ) -> QPResult: ...
[docs] class SciPyQPSolver: """QP solver backed by :func:`scipy.optimize.minimize` with ``method='SLSQP'``. Implements the :class:`QPSolver` protocol. Converts the OSQP standard-form bounds $l ≤ Ax ≤ u$ to SLSQP inequality/equality constraints. Notes: SLSQP is a gradient-based method suitable for small to medium problems (up to a few hundred variables). For large-scale bundle QPs use an OSQP- or Clarabel-backed solver (Phase 4). """
[docs] def solve( self, P: np.ndarray, q: np.ndarray, A: np.ndarray, l: np.ndarray, u: np.ndarray, x0: Optional[np.ndarray] = None, ) -> QPResult: """Solve the QP and return a :class:`QPResult`. Args: P: Symmetric PSD Hessian of shape ``(n, n)``. q: Linear cost vector of shape ``(n,)``. A: Constraint matrix of shape ``(m, n)``. l: Lower-bound vector of shape ``(m,)``. u: Upper-bound vector of shape ``(m,)``. x0: Optional warm-start point of shape ``(n,)``. Returns: :class:`QPResult` with ``status='solved'`` on success. """ n = q.shape[0] if x0 is None: x0 = np.zeros(n) def objective(x: np.ndarray) -> float: return 0.5 * float(x @ P @ x) + float(q @ x) def gradient(x: np.ndarray) -> np.ndarray: return P @ x + q constraints = [] for i in range(A.shape[0]): row = A[i] li = l[i] ui = u[i] finite_l = np.isfinite(li) finite_u = np.isfinite(ui) if finite_l and finite_u and li == ui: # Equality constraint: A[i] @ x == li def _eq(x, row=row, val=li): return float(row @ x) - val constraints.append({"type": "eq", "fun": _eq}) else: if finite_u: # A[i] @ x <= u[i] → u[i] - A[i]@x >= 0 def _ub(x, row=row, val=ui): return val - float(row @ x) constraints.append({"type": "ineq", "fun": _ub}) if finite_l: # A[i] @ x >= l[i] → A[i]@x - l[i] >= 0 def _lb(x, row=row, val=li): return float(row @ x) - val constraints.append({"type": "ineq", "fun": _lb}) result = minimize( fun=objective, x0=x0, jac=gradient, method="SLSQP", constraints=constraints, options={"ftol": 1e-9, "maxiter": 1000}, ) status = "solved" if result.success else result.message return QPResult(x=result.x, obj=float(result.fun), status=status)
[docs] class OSQPQPSolver: """QP solver using OSQP (ADMM-based). Requires ``pip install osqp``. Supports warm-starting via the *x0* parameter. A fresh OSQP instance is created for every :meth:`solve` call to avoid stale state. Args: eps_abs: Absolute feasibility tolerance (default ``1e-6``). eps_rel: Relative feasibility tolerance (default ``1e-6``). verbose: Whether OSQP prints solver output (default ``False``). polish: Whether to apply polishing step for higher accuracy (default ``True``). max_iter: Maximum number of ADMM iterations (default ``10000``). Raises: ImportError: If the ``osqp`` package is not installed. """ def __init__( self, *, eps_abs: float = 1e-6, eps_rel: float = 1e-6, verbose: bool = False, polish: bool = True, max_iter: int = 10000, ) -> None: try: import osqp as _osqp # noqa: F401 except ImportError as exc: raise ImportError( "OSQPQPSolver requires the 'osqp' package. " "Install it with: pip install osqp" ) from exc self._eps_abs = eps_abs self._eps_rel = eps_rel self._verbose = verbose self._polish = polish self._max_iter = max_iter
[docs] def solve( self, P: np.ndarray, q: np.ndarray, A: np.ndarray, l: np.ndarray, u: np.ndarray, x0: Optional[np.ndarray] = None, ) -> QPResult: """Solve the QP using OSQP and return a :class:`QPResult`. Args: P: Symmetric PSD Hessian of shape ``(n, n)``. q: Linear cost vector of shape ``(n,)``. A: Constraint matrix of shape ``(m, n)``. l: Lower-bound vector of shape ``(m,)``; ``-np.inf`` for one-sided. u: Upper-bound vector of shape ``(m,)``; ``+np.inf`` for one-sided. x0: Optional warm-start primal solution of shape ``(n,)``. Returns: :class:`QPResult` with ``status='solved'`` on success. """ import osqp import scipy.sparse as sp # OSQP uses 1e30 to represent infinity. _INF = 1e30 l_osqp = np.where(np.isneginf(l), -_INF, l) u_osqp = np.where(np.isposinf(u), _INF, u) P_sparse = sp.csc_matrix(P) A_sparse = sp.csc_matrix(A) prob = osqp.OSQP() prob.setup( P_sparse, q, A_sparse, l_osqp, u_osqp, verbose=self._verbose, eps_abs=self._eps_abs, eps_rel=self._eps_rel, polishing=self._polish, max_iter=self._max_iter, warm_starting=True, ) if x0 is not None: prob.warm_start(x=x0) results = prob.solve() raw_status: str = results.info.status status = "solved" if "solved" in raw_status.lower() else raw_status return QPResult( x=results.x, obj=float(results.info.obj_val), status=status, )
[docs] class ClarabelQPSolver: """QP solver using Clarabel (interior-point). Requires ``pip install clarabel``. Converts the OSQP standard form $l ≤ Ax ≤ u$ to Clarabel's cone form internally. Equality constraints ($l_i = u_i$) are handled via :class:`clarabel.ZeroConeT`; inequality constraints via :class:`clarabel.NonnegativeConeT`. Args: verbose: Whether Clarabel prints solver output (default ``False``). max_iter: Maximum interior-point iterations (default ``200``). eps_abs: Absolute convergence tolerance (default ``1e-8``). eps_rel: Relative convergence tolerance (default ``1e-8``). Raises: ImportError: If the ``clarabel`` package is not installed. """ def __init__( self, *, verbose: bool = False, max_iter: int = 200, eps_abs: float = 1e-8, eps_rel: float = 1e-8, ) -> None: try: import clarabel as _clarabel # noqa: F401 except ImportError as exc: raise ImportError( "ClarabelQPSolver requires the 'clarabel' package. " "Install it with: pip install clarabel" ) from exc self._verbose = verbose self._max_iter = max_iter self._eps_abs = eps_abs self._eps_rel = eps_rel
[docs] def solve( self, P: np.ndarray, q: np.ndarray, A: np.ndarray, l: np.ndarray, u: np.ndarray, x0: Optional[np.ndarray] = None, ) -> QPResult: """Solve the QP using Clarabel and return a :class:`QPResult`. Args: P: Symmetric PSD Hessian of shape ``(n, n)``. q: Linear cost vector of shape ``(n,)``. A: Constraint matrix of shape ``(m, n)``. l: Lower-bound vector of shape ``(m,)``; ``-np.inf`` for one-sided. u: Upper-bound vector of shape ``(m,)``; ``+np.inf`` for one-sided. x0: Warm-start hint (ignored if API unavailable). Returns: :class:`QPResult` with ``status='solved'`` on success. Notes: Clarabel's constraint form is $Ax + s = b$, $s ∈ K$. For :class:`~clarabel.NonnegativeConeT` this means $b - Ax ≥ 0$, i.e. $Ax ≤ b$. Each OSQP row is therefore expanded into at most two Clarabel rows. """ import clarabel import scipy.sparse as sp m = A.shape[0] eq_rows: list[np.ndarray] = [] eq_b: list[float] = [] ineq_rows: list[np.ndarray] = [] ineq_b: list[float] = [] for i in range(m): li, ui = l[i], u[i] row = A[i] if np.isfinite(li) and np.isfinite(ui) and li == ui: # Equality: A[i]x = li → ZeroCone eq_rows.append(row) eq_b.append(float(li)) else: if np.isfinite(ui): # A[i]x <= ui → ui - A[i]x >= 0 → NonnegativeCone ineq_rows.append(row) ineq_b.append(float(ui)) if np.isfinite(li): # A[i]x >= li → A[i]x - li >= 0 → -(-A[i])x - li <= 0 # Clarabel: b - Ax >= 0 so row is -A[i], b is -li ineq_rows.append(-row) ineq_b.append(float(-li)) n_eq = len(eq_rows) n_ineq = len(ineq_rows) all_rows = eq_rows + ineq_rows all_b = np.array(eq_b + ineq_b, dtype=float) A_cl = sp.csc_matrix( np.array(all_rows, dtype=float).reshape(n_eq + n_ineq, A.shape[1]) ) cones = [] if n_eq > 0: cones.append(clarabel.ZeroConeT(n_eq)) if n_ineq > 0: cones.append(clarabel.NonnegativeConeT(n_ineq)) P_sparse = sp.csc_matrix(P, dtype=float) q_arr = np.asarray(q, dtype=float) settings = clarabel.DefaultSettings() settings.verbose = self._verbose settings.max_iter = self._max_iter settings.tol_gap_abs = self._eps_abs settings.tol_gap_rel = self._eps_rel settings.tol_feas = self._eps_abs solver = clarabel.DefaultSolver(P_sparse, q_arr, A_cl, all_b, cones, settings) result = solver.solve() x = np.asarray(result.x, dtype=float) raw_status = str(result.status) status = "solved" if "solved" in raw_status.lower() else raw_status obj = float(0.5 * x @ P @ x + q_arr @ x) return QPResult(x=x, obj=obj, status=status)
[docs] def best_available_qp_solver() -> "QPSolver": """Return the best available QP solver (OSQP > Clarabel > SciPy). Tries solvers in order of preference: OSQP (ADMM, fast for large-scale), then Clarabel (interior-point, high accuracy), then the SciPy SLSQP fallback. Returns: A :class:`QPSolver` instance backed by the best installed package. """ try: return OSQPQPSolver() except ImportError: pass try: return ClarabelQPSolver() except ImportError: pass return SciPyQPSolver()
# --------------------------------------------------------------------------- # BundleResult # ---------------------------------------------------------------------------
[docs] @dataclass class BundleResult: """Result from a bundle method optimisation run. Attributes: x_best: Best primal iterate found (lowest function value). f_best: Function value at ``x_best``; upper bound on the optimum. f_low: Lower bound on the optimum from the cutting-plane model. gap: Optimality gap ``f_best - f_low`` (non-negative for a valid lower bound). converged: Whether the gap tolerance was satisfied. num_iterations: Number of bundle loop iterations (master solves), excluding the initial oracle evaluation before the loop. num_serious_steps: Number of serious (descent) steps taken. function_values: History of function values at each iteration. iterates: Optional history of all iterates (memory intensive). """ x_best: "Vector" f_best: float f_low: float gap: float converged: bool num_iterations: int num_serious_steps: int function_values: List[float] iterates: Optional[List["Vector"]] = None
# --------------------------------------------------------------------------- # Proximal Bundle Method # --------------------------------------------------------------------------- def _get_value_and_subgradient( oracle: NonLinearForm, x: "Vector" ) -> "tuple[float, Vector]": """Duck-typed value + subgradient query. If *oracle* has a ``value_and_subgradient`` method (e.g. :class:`~pygeoinf.backus_gilbert.DualMasterCostFunction`) it is called to share computation. Otherwise the value and subgradient are obtained via separate ``oracle(x)`` and ``oracle.subgradient(x)`` calls. Args: oracle: A :class:`NonLinearForm` with a subgradient oracle. x: The query point. Returns: ``(f, g)`` — scalar value and subgradient vector. """ if ( hasattr(oracle, "value_and_subgradient") and callable(oracle.value_and_subgradient) ): return oracle.value_and_subgradient(x) return oracle(x), oracle.subgradient(x)
[docs] class ProximalBundleMethod: """Proximal bundle method for minimising a non-smooth convex function. Solves: min_{lambda in D} f(lambda) where f is a convex function accessible through a value + subgradient oracle (a :class:`~pygeoinf.nonlinear_forms.NonLinearForm` with ``subgradient``). At each iteration the *master QP* is: min_{lambda, t} t + (rho / 2) ||lambda - lambda_hat||^2 subject to: f_j + <g_j, lambda - x_j> <= t for all j in bundle where lambda_hat is the current *stability centre* and rho > 0 is the proximal weight. A *serious step* is taken whenever the new oracle value f(lambda_+) < f(lambda_hat); otherwise a *null step* occurs and rho is increased to tighten the proximal term. Args: oracle: Non-smooth convex functional with subgradient oracle. rho0: Initial proximal weight rho > 0. rho_factor: Multiplicative factor applied to rho on null steps (divide on serious steps). tolerance: Convergence tolerance; terminates when the duality gap f_up - f_low <= tolerance. max_iterations: Maximum number of oracle calls. bundle_size: Maximum number of cuts retained in the bundle. store_iterates: If ``True``, all iterates are stored in :attr:`BundleResult.iterates`. qp_solver: QP solver implementing :class:`QPSolver`. Defaults to :class:`SciPyQPSolver` if ``None``. Examples: >>> from pygeoinf.hilbert_space import EuclideanSpace >>> from pygeoinf.nonlinear_forms import NonLinearForm >>> import numpy as np >>> domain = EuclideanSpace(1) >>> f = lambda x: float(x[0]**2 + 2*x[0]) >>> g = lambda x: np.array([2*x[0] + 2.0]) >>> oracle = NonLinearForm(domain, f, subgradient=g) >>> solver = ProximalBundleMethod(oracle, tolerance=1e-5) >>> result = solver.solve(domain.from_components(np.array([2.0]))) >>> np.testing.assert_allclose(domain.to_components(result.x_best), [-1.0], atol=1e-3) """ def __init__( self, oracle: NonLinearForm, /, *, rho0: float = 1.0, rho_factor: float = 2.0, tolerance: float = 1e-6, max_iterations: int = 500, bundle_size: int = 100, store_iterates: bool = False, qp_solver: Optional["QPSolver"] = None, ) -> None: if rho0 <= 0: raise ValueError("rho0 must be positive") if rho_factor <= 1.0: raise ValueError("rho_factor must be > 1") if tolerance <= 0: raise ValueError("tolerance must be positive") if max_iterations < 1: raise ValueError("max_iterations must be >= 1") if bundle_size < 1: raise ValueError("bundle_size must be >= 1") self._oracle = oracle self._rho0 = rho0 self._rho_factor = rho_factor self._tolerance = tolerance self._max_iterations = max_iterations self._bundle_size = bundle_size self._store_iterates = store_iterates self._qp_solver: QPSolver = qp_solver if qp_solver is not None else SciPyQPSolver() def _solve_master( self, bundle: Bundle, lam_hat: "Vector", rho: float, domain: "HilbertSpace", x0_comps: np.ndarray, t_warm: float = 0.0, ) -> "tuple[Vector, float]": """Solve the proximal bundle master QP. Minimises: t + (rho / 2) ||lambda - lambda_hat||^2 subject to the bundle cutting-plane constraints. Args: bundle: Current bundle of cuts. lam_hat: Stability centre $\\hat{λ}$. rho: Proximal weight. domain: Hilbert space of the decision variable. x0_comps: Warm-start components for the QP solver. Returns: ``(lam_next, qp_obj)`` — optimal point (as a Hilbert-space element) and the QP objective value. """ d = domain.dim lam_hat_c = domain.to_components(lam_hat) # Hessian: rho * I_{d} in the λ block, 0 for t P = np.zeros((d + 1, d + 1)) P[:d, :d] = rho * np.eye(d) # Linear cost: -rho * lam_hat for λ components, 1 for t q_vec = np.zeros(d + 1) q_vec[:d] = -rho * lam_hat_c q_vec[d] = 1.0 # Inequality constraints from bundle: A_ineq @ z <= b_ineq A_ineq, b_ineq = bundle.linearization_matrix(lam_hat, domain) n_cuts = A_ineq.shape[0] l_bounds = np.full(n_cuts, -np.inf) u_bounds = b_ineq # Warm start: use t_warm (should be feasible, e.g. f_hat) to help SLSQP x0 = np.append(x0_comps, t_warm) result = self._qp_solver.solve(P, q_vec, A_ineq, l_bounds, u_bounds, x0=x0) lam_next_c = result.x[:d] t_opt = float(result.x[d]) # the t variable at the QP solution lam_next = domain.from_components(lam_next_c) return lam_next, t_opt
[docs] def solve(self, x0: "Vector") -> BundleResult: """Run the proximal bundle method starting from *x0*. Args: x0: Initial point in the domain of the oracle. Returns: A :class:`BundleResult` summarising the optimisation run. """ domain = self._oracle.domain lam_hat = x0 f_hat, g_hat = _get_value_and_subgradient(self._oracle, lam_hat) f_up = f_hat best_lam = lam_hat f_low = -np.inf bundle = Bundle() bundle.add_cut(Cut(x=lam_hat, f=f_hat, g=g_hat, iteration=0)) rho = self._rho0 n_serious = 0 function_values: List[float] = [f_hat] iterates: Optional[List["Vector"]] = [] if self._store_iterates else None lam_hat_c = domain.to_components(lam_hat) t_warm = f_hat # feasible warm-start for the t variable for k in range(self._max_iterations): # Solve master QP — returns (lam_next, t_opt) where t_opt = result.x[d] # is the cutting-plane model value at lam_next: hat_phi(lam_next) <= f(lam_next). lam_next, t_opt = self._solve_master( bundle, lam_hat, rho, domain, lam_hat_c, t_warm=t_warm ) lam_next_c = domain.to_components(lam_next) # Oracle evaluation f_next, g_next = _get_value_and_subgradient(self._oracle, lam_next) function_values.append(f_next) if iterates is not None: iterates.append(lam_next) # Update best if f_next < f_up: f_up = f_next best_lam = lam_next # ------------------------------------------------------------------ # Step classification: serious or null # ------------------------------------------------------------------ if f_next < f_hat: # Serious step: accept lam_next as new stability centre. # Reset the lower bound — the old f_low was measured relative to # the previous stability centre and may be meaningless now. lam_hat = lam_next lam_hat_c = lam_next_c f_hat = f_next n_serious += 1 rho = rho / self._rho_factor f_low = -np.inf # reset; will be re-established after a null step else: # Null step: tighten proximal weight. # t_opt = hat_phi(lam_next) is a valid cutting-plane lower bound # relative to the current stability centre. Update monotonically. rho = rho * self._rho_factor f_low = max(f_low, t_opt) # Add cut and compress bundle bundle.add_cut(Cut(x=lam_next, f=f_next, g=g_next, iteration=k + 1)) bundle.compress(self._bundle_size) t_warm = f_hat # always a feasible warm-start # ------------------------------------------------------------------ # Convergence check (after null-step lower-bound update) # ------------------------------------------------------------------ # gap = f_hat - f_low measures how tight the cutting-plane model is # at the stability centre. When small, the model is nearly exact and # lam_hat is near-optimal (f_hat ≈ f* and 0 ∈ ∂f(lam_hat)). if f_low > -np.inf: gap = f_hat - f_low if gap <= self._tolerance: return BundleResult( x_best=best_lam, f_best=f_up, f_low=f_low, gap=gap, converged=True, num_iterations=k + 1, num_serious_steps=n_serious, function_values=function_values, iterates=iterates, ) # Max iterations reached without convergence gap = f_hat - f_low if f_low > -np.inf else np.inf return BundleResult( x_best=best_lam, f_best=f_up, f_low=f_low, gap=gap, converged=False, num_iterations=self._max_iterations, num_serious_steps=n_serious, function_values=function_values, iterates=iterates, )
# --------------------------------------------------------------------------- # Level Bundle Method # ---------------------------------------------------------------------------
[docs] class LevelBundleMethod: """Level bundle method for minimising a non-smooth convex function. Solves: min_{lambda in D} f(lambda) where f is a convex function accessible through a value + subgradient oracle (a :class:`~pygeoinf.nonlinear_forms.NonLinearForm` with ``subgradient``). At each iteration the *level master QP* is: min_{lambda, t} (1/2) ||lambda - lambda_hat||^2 subject to: f_j + <g_j, lambda - x_j> <= t for all j t <= f_lev where the level is: f_lev = alpha * f_low + (1 - alpha) * f_up, alpha in (0,1). The lower bound f_low is maintained as the LP optimal value of the cutting-plane model: f_LP = min_{lambda} hat_phi(lambda) = min_{lambda, t} t subject to: f_j + <g_j, lambda - x_j> <= t **Infeasibility handling.** If the level QP is infeasible (which can happen when f_lev < f_LP for a tight alpha), alpha is widened by a factor of 1.5 (capped at 0.9) for up to three attempts. If all attempts fail an emergency proximal step is taken (minimize t + (1/2) ||lambda - lambda_hat||^2 over the current bundle) so the stability centre and bundle are always updated. Args: oracle: Non-smooth convex functional with subgradient oracle. alpha: Level parameter alpha in (0, 1) controlling how aggressively the level is set towards the lower bound. Smaller values are more aggressive (risk infeasibility); larger are more conservative. Defaults to ``0.1``. tolerance: Convergence tolerance; terminates when the duality gap f_up - f_low <= tolerance. max_iterations: Maximum number of oracle calls. bundle_size: Maximum number of cuts retained in the bundle. store_iterates: If ``True``, all iterates are stored in :attr:`BundleResult.iterates`. qp_solver: QP solver implementing :class:`QPSolver`. Defaults to :class:`SciPyQPSolver` if ``None``. Examples: >>> from pygeoinf.hilbert_space import EuclideanSpace >>> from pygeoinf.nonlinear_forms import NonLinearForm >>> import numpy as np >>> domain = EuclideanSpace(1) >>> f = lambda x: float(x[0]**2 + 2*x[0]) >>> g = lambda x: np.array([2*x[0] + 2.0]) >>> oracle = NonLinearForm(domain, f, subgradient=g) >>> solver = LevelBundleMethod(oracle, tolerance=1e-5) >>> result = solver.solve(domain.from_components(np.array([2.0]))) >>> np.testing.assert_allclose(domain.to_components(result.x_best), [-1.0], atol=1e-3) """ def __init__( self, oracle: NonLinearForm, /, *, alpha: float = 0.1, tolerance: float = 1e-6, max_iterations: int = 500, bundle_size: int = 100, store_iterates: bool = False, qp_solver: Optional["QPSolver"] = None, ) -> None: if not (0.0 < alpha < 1.0): raise ValueError("alpha must be in (0, 1)") if tolerance <= 0: raise ValueError("tolerance must be positive") if max_iterations < 1: raise ValueError("max_iterations must be >= 1") if bundle_size < 1: raise ValueError("bundle_size must be >= 1") self._oracle = oracle self._alpha = alpha self._tolerance = tolerance self._max_iterations = max_iterations self._bundle_size = bundle_size self._store_iterates = store_iterates self._qp_solver: QPSolver = qp_solver if qp_solver is not None else SciPyQPSolver() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _compute_lp_lower_bound( self, bundle: Bundle, lam_hat: "Vector", domain: "HilbertSpace", lam_hat_c: np.ndarray, ) -> float: """Compute a lower bound on f* via the bundle LP. Solves the cutting-plane LP: f_LP = min_{lambda, t} t subject to: f_j + <g_j, lambda - x_j> <= t ||lambda - lambda_hat||_inf <= R where R = 1e3 * (1 + ||lambda_hat||_inf) is a large box that keeps the LP bounded without biasing the lower bound. The LP is solved via :func:`scipy.optimize.linprog` (HiGHS simplex), which is more reliable than ADMM-based QP solvers for this nearly-unbounded pure-LP subproblem. Since hat_phi(lambda) <= f(lambda) for all lambda, the LP optimum satisfies f_LP <= f*, providing a valid global lower bound. Args: bundle: Current bundle of cuts. lam_hat: Stability centre $\\hat{λ}$. domain: Hilbert space of the decision variable. lam_hat_c: Component array of the stability centre. Returns: Scalar lower-bound estimate f_LP, or ``-np.inf`` if the LP cannot be solved reliably (e.g. only a single cut present when the problem is genuinely unbounded). """ from scipy.optimize import linprog d = domain.dim # LP variables: z = [lambda (d), t (1)] (d+1 variables) # Objective: min t → c = [0, ..., 0, 1] c = np.zeros(d + 1) c[d] = 1.0 # Cut constraints: A_ineq @ z <= b_ineq A_ub, b_ub = bundle.linearization_matrix(lam_hat, domain) # Box constraint: -R <= lambda_i - lam_hat_i <= R (no constraint on t) R = 1e3 * (1.0 + float(np.max(np.abs(lam_hat_c)))) lb = np.append(lam_hat_c - R, -np.inf) # lower var bounds ub = np.append(lam_hat_c + R, np.inf) # upper var bounds result = linprog( c, A_ub=A_ub, b_ub=b_ub, bounds=list(zip(lb, ub)), method="highs" ) if result.status == 0: return float(result.x[d]) return -np.inf # infeasible or unbounded — return conservative bound def _solve_level_master( self, bundle: Bundle, lam_hat: "Vector", f_lev: float, domain: "HilbertSpace", lam_hat_c: np.ndarray, t_warm: float = 0.0, ) -> QPResult: """Solve the level-bundle master QP. Minimises: (1/2) ||lambda - lambda_hat||^2 subject to the bundle cutting-plane constraints AND the level constraint t <= f_lev. Decision variable: z = [lambda_1, ..., lambda_d, t]. Args: bundle: Current bundle of cuts. lam_hat: Stability centre $\\hat{λ}$. f_lev: Level value; upper bound on $t$. domain: Hilbert space of the decision variable. lam_hat_c: Component array of the stability centre. t_warm: Warm-start value for the $t$ component. Returns: :class:`QPResult` from the QP solver. """ d = domain.dim # Objective: 0.5 ||λ - lam_hat||^2 = 0.5 λ'λ - lam_hat'λ + const # P = block_diag(I_d, 0), q = [-lam_hat_c, 0] P = np.zeros((d + 1, d + 1)) P[:d, :d] = np.eye(d) q_vec = np.zeros(d + 1) q_vec[:d] = -lam_hat_c # Cut constraints: A_ineq @ z <= b_ineq A_ineq, b_ineq = bundle.linearization_matrix(lam_hat, domain) n_cuts = A_ineq.shape[0] # Level constraint: t <= f_lev (i.e. [0...0, 1] @ z <= f_lev) A_level = np.zeros((1, d + 1)) A_level[0, d] = 1.0 A_full = np.vstack([A_ineq, A_level]) l_full = np.full(n_cuts + 1, -np.inf) u_full = np.append(b_ineq, f_lev) x0 = np.append(lam_hat_c, t_warm) return self._qp_solver.solve(P, q_vec, A_full, l_full, u_full, x0=x0) def _solve_emergency_proximal( self, bundle: Bundle, lam_hat: "Vector", domain: "HilbertSpace", lam_hat_c: np.ndarray, f_hat: float, ) -> "tuple[Vector, float]": """Emergency proximal fallback when all level QP attempts fail. Minimises: t + (1/2) ||lambda - lambda_hat||^2 subject to the bundle cutting-plane constraints (no level constraint). Args: bundle: Current bundle of cuts. lam_hat: Stability centre. domain: Hilbert space of the decision variable. lam_hat_c: Component array of the stability centre. f_hat: Function value at stability centre (used for warm start). Returns: ``(lam_next, t_opt)`` — next iterate and its cutting-plane value. """ d = domain.dim P = np.zeros((d + 1, d + 1)) P[:d, :d] = np.eye(d) q_vec = np.zeros(d + 1) q_vec[:d] = -lam_hat_c q_vec[d] = 1.0 # penalise t (same as proximal with rho=1) A_ineq, b_ineq = bundle.linearization_matrix(lam_hat, domain) n_cuts = A_ineq.shape[0] l_bounds = np.full(n_cuts, -np.inf) x0 = np.append(lam_hat_c, f_hat) result = self._qp_solver.solve(P, q_vec, A_ineq, l_bounds, b_ineq, x0=x0) lam_next = domain.from_components(result.x[:d]) t_opt = float(result.x[d]) return lam_next, t_opt # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def solve(self, x0: "Vector") -> BundleResult: """Run the level bundle method starting from *x0*. Args: x0: Initial point in the domain of the oracle. Returns: A :class:`BundleResult` summarising the optimisation run. """ domain = self._oracle.domain d = domain.dim lam_hat = x0 f_hat, g_hat = _get_value_and_subgradient(self._oracle, lam_hat) f_up = f_hat best_lam = lam_hat f_low = -np.inf bundle = Bundle() bundle.add_cut(Cut(x=lam_hat, f=f_hat, g=g_hat, iteration=0)) n_serious = 0 function_values: List[float] = [f_hat] iterates: Optional[List["Vector"]] = [] if self._store_iterates else None lam_hat_c = domain.to_components(lam_hat) for k in range(self._max_iterations): # ------------------------------------------------------------------ # Step 1: update LP lower bound from the cutting-plane model. # ------------------------------------------------------------------ f_lp = self._compute_lp_lower_bound(bundle, lam_hat, domain, lam_hat_c) f_low = max(f_low, f_lp) # ------------------------------------------------------------------ # Step 2: convergence check (gap between upper and lower bound). # ------------------------------------------------------------------ if f_low > -np.inf: gap = f_up - f_low if gap <= self._tolerance: return BundleResult( x_best=best_lam, f_best=f_up, f_low=f_low, gap=gap, converged=True, num_iterations=k, num_serious_steps=n_serious, function_values=function_values, iterates=iterates, ) # ------------------------------------------------------------------ # Step 3: solve the level QP with infeasibility recovery. # ------------------------------------------------------------------ lam_next: Optional["Vector"] = None alpha_try = self._alpha for attempt in range(3): if f_low > -np.inf: f_lev_try = alpha_try * f_low + (1.0 - alpha_try) * f_up else: f_lev_try = f_up t_warm = min(f_lev_try, f_hat) qp_res = self._solve_level_master( bundle, lam_hat, f_lev_try, domain, lam_hat_c, t_warm=t_warm ) if qp_res.status == "solved": lam_next = domain.from_components(qp_res.x[:d]) break # Widen level for next attempt alpha_try = min(alpha_try * 1.5, 0.9) if lam_next is None: # All level QP attempts failed — emergency proximal fallback. lam_next, _ = self._solve_emergency_proximal( bundle, lam_hat, domain, lam_hat_c, f_hat ) # ------------------------------------------------------------------ # Step 5: oracle evaluation at the new candidate point. # ------------------------------------------------------------------ f_next, g_next = _get_value_and_subgradient(self._oracle, lam_next) function_values.append(f_next) if iterates is not None: iterates.append(lam_next) # Update upper bound / best point. if f_next < f_up: f_up = f_next best_lam = lam_next # ------------------------------------------------------------------ # Step 6: serious or null step classification. # ------------------------------------------------------------------ lam_next_c = domain.to_components(lam_next) if f_next < f_hat: # Serious step: accept lam_next as new stability centre. lam_hat = lam_next lam_hat_c = lam_next_c f_hat = f_next n_serious += 1 # Add cut and compress bundle (always, regardless of step type). bundle.add_cut(Cut(x=lam_next, f=f_next, g=g_next, iteration=k + 1)) bundle.compress(self._bundle_size) # ------------------------------------------------------------------ # Max iterations reached without convergence. # ------------------------------------------------------------------ gap = f_up - f_low if f_low > -np.inf else np.inf return BundleResult( x_best=best_lam, f_best=f_up, f_low=f_low, gap=gap, converged=False, num_iterations=self._max_iterations, num_serious_steps=n_serious, function_values=function_values, iterates=iterates, )
# --------------------------------------------------------------------------- # Multi-direction batch helper # ---------------------------------------------------------------------------
[docs] def solve_support_values( cost, qs, solver, lambda0, *, warm_start: bool = True, n_jobs: int = 1, ): """Compute support function values for multiple directions. Solves the dual master minimisation for each direction q_i in qs, optionally warm-starting from the previous direction's solution. The support function of a set U evaluated at direction q is: h_U(q) = min_{lambda} f(q, lambda) where f(q, ·) is the dual master cost with direction q. Args: cost: DualMasterCostFunction instance with a ``set_direction`` method. qs: Directions to evaluate; either a list of Vectors or an ``np.ndarray`` of shape ``(p, prop_dim)``. solver: Bundle method solver (ProximalBundleMethod or LevelBundleMethod). lambda0: Initial lambda for the first direction (a Vector in the data space). warm_start: If ``True`` (default), each direction starts from the previous direction's optimal lambda. If ``False``, always start from ``lambda0``. n_jobs: Number of parallel jobs. ``1`` = fully sequential (warm starting works). ``>1`` = joblib Parallel (warm-starting across workers is disabled; each worker starts from ``lambda0``). Returns: values: ``np.ndarray`` of shape ``(p,)``, support values $h_U(q_i)$ for each direction. lambdas: ``list`` of length ``p``, optimal lambda for each direction. diagnostics: ``list`` of :class:`BundleResult` for each direction. Raises: ImportError: If ``n_jobs > 1`` and ``joblib`` is not installed (falls back to sequential with a warning instead of raising). """ if n_jobs > 1: try: import copy from joblib import Parallel, delayed def _solve_one(cost_copy, q, solver_copy, lam0): cost_copy.set_direction(q) # Ensure the solver uses cost_copy (not its own deep-copied # internal oracle, which would not have the updated direction). solver_copy._oracle = cost_copy result = solver_copy.solve(lam0) return result.f_best, result.x_best, result results_raw = Parallel(n_jobs=n_jobs)( delayed(_solve_one)( copy.deepcopy(cost), q, copy.deepcopy(solver), lambda0 ) for q in qs ) values_list, lambdas, diagnostics = zip(*results_raw) return np.array(values_list), list(lambdas), list(diagnostics) except ImportError: import warnings warnings.warn( "joblib is not installed; falling back to sequential execution.", RuntimeWarning, stacklevel=2, ) # fall through to sequential below # Sequential implementation (also used as joblib fallback). lam_current = lambda0 values_list = [] lambdas = [] diagnostics = [] for q in qs: cost.set_direction(q) result = solver.solve(lam_current if warm_start else lambda0) values_list.append(result.f_best) lambdas.append(result.x_best) diagnostics.append(result) if warm_start: lam_current = result.x_best return np.array(values_list), lambdas, diagnostics
# --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # KKT Primal Solver result # ---------------------------------------------------------------------------
[docs] @dataclass class KKTResult: """Result from :class:`PrimalKKTSolver`. Attributes: m: Optimal primal model vector $u^*$ in the model space. multipliers: KKT multipliers $(\\lambda^*, \\mu^*)$. $\\lambda^*$ enforces the model prior constraint and $\\mu^*$ enforces the data-fit constraint. Both are non-negative. converged: ``True`` if the root-finder converged to the required tolerance. num_iterations: Number of function evaluations used by the root-finder (or 1 for the closed-form branch). """ m: "Vector" multipliers: "tuple[float, float]" converged: bool num_iterations: int
# --------------------------------------------------------------------------- # Chambolle-Pock primal-dual algorithm # ---------------------------------------------------------------------------
[docs] @dataclass class ChambollePockResult: """Result from :class:`ChambollePockSolver`. Attributes: m: Primal variable m* in B (model space). v: Primal variable v* in V (data space). mu: Dual variable mu* in D (data space). Approximates the optimal Lagrange multiplier for the equality constraint G @ m + v = d_tilde. primal_dual_gap: Feasibility residual ||G @ m* + v* - d_tilde|| at termination. converged: ``True`` if ``primal_dual_gap < tolerance``. num_iterations: Number of iterations performed. """ m: "Vector" v: "Vector" mu: "Vector" primal_dual_gap: float converged: bool num_iterations: int
[docs] class ChambollePockSolver: r"""Solve the primal feasibility form of the dual master via Chambolle-Pock. Solves the constrained maximisation: h_U(q) = max_{m in B, v in V} <c, m> subject to: G @ m + v = d_tilde where c = T* @ q is the linear objective and the feasible set (B, V, G, d_tilde) is **fixed**, using the first-order primal-dual algorithm of Chambolle & Pock (2011). The saddle-point reformulation (with dual variable mu in D) is: min_{m in B, v in V} max_{mu} <G @ m + v - d_tilde, mu> - <c, m> with operator K = [G; I_D] : M x D -> D. **Convergence rate:** O(1/N) in the primal-dual gap when tau * sigma * ||K||^2 <= 1 (where ||K||^2 <= ||G||^2 + 1). This is particularly efficient when the objective c = T* @ q changes while the feasible set (B, V, G, d_tilde) remains fixed. Args: B: Support function for the model prior (B subset of M). V: Support function for the data error set (V subset of D). G: Forward operator G: M -> D. d_tilde: Observed data vector d_tilde in D. sigma: Dual step size. If ``None``, auto-selected from power iteration. tau: Primal step size. If ``None``, auto-selected from power iteration. theta: Over-relaxation parameter (default 1.0). max_iterations: Maximum number of iterations. tolerance: Convergence tolerance on feasibility residual ||G @ m + v - d_tilde||. References: Chambolle A. & Pock T. (2011). A First-Order Primal-Dual Algorithm for Convex Problems with Applications to Imaging. *Journal of Mathematical Imaging and Vision*, 40(1), 120–145. https://doi.org/10.1007/s10851-010-0251-1 """ def __init__( self, B: "SupportFunction", V: "SupportFunction", G: "LinearOperator", d_tilde: "Vector", /, *, sigma: Optional[float] = None, tau: Optional[float] = None, theta: float = 1.0, max_iterations: int = 1000, tolerance: float = 1e-6, ) -> None: self._B = B self._V = V self._G = G self._d_tilde = d_tilde self._theta = theta self._max_iterations = max_iterations self._tolerance = tolerance self._model_space = B.primal_domain self._data_space = G.codomain # Auto-compute step sizes if not provided if sigma is None or tau is None: _sigma, _tau = self._compute_step_sizes() self._sigma = sigma if sigma is not None else _sigma self._tau = tau if tau is not None else _tau else: self._sigma = sigma self._tau = tau def _compute_step_sizes(self) -> "tuple[float, float]": """Estimate ||G|| via 20 steps of power iteration. Returns: (sigma, tau) satisfying tau * sigma * ||K||^2 <= 0.99 where K = [G; I] so ||K||^2 <= ||G||^2 + 1. """ model_space = self._model_space G = self._G rng = np.random.default_rng(0) x_m = model_space.from_components( rng.standard_normal(model_space.dim) ) # Normalise initial vector n0 = model_space.norm(x_m) if n0 > 1e-14: x_m = model_space.multiply(1.0 / n0, x_m) eigenvalue_est = 1.0 for _ in range(20): y = G(x_m) # data space: G x_m z = G.adjoint(y) # model space: G^T G x_m eigenvalue_est = model_space.norm(z) # ||G^T G x_m|| -> sigma_max^2 if eigenvalue_est < 1e-14: eigenvalue_est = 0.0 break x_m = model_space.multiply(1.0 / eigenvalue_est, z) # eigenvalue_est ≈ ||G||^2 (dominant eigenvalue of G^T G) G_norm_est = float(np.sqrt(max(eigenvalue_est, 0.0))) # ||K||^2 <= ||G||^2 + 1 (K = [G; I]) K_norm = float(np.sqrt(G_norm_est ** 2 + 1.0)) * 1.01 # slight over-estimate step = 0.99 / K_norm return step, step def _proj_B(self, z: "Vector") -> "Vector": """Project $z$ onto the set $B$. Supports :class:`BallSupportFunction` (closed ball projection). Raises :class:`NotImplementedError` for other support function types. """ if isinstance(self._B, BallSupportFunction): center = self._B._center radius = self._B._radius H = self._model_space diff = H.subtract(z, center) dist = H.norm(diff) if dist <= radius: return z # proj_B(z) = c + r * (z - c) / ||z - c|| return H.add(center, H.multiply(radius / dist, diff)) elif isinstance(self._B, EllipsoidSupportFunction): raise NotImplementedError( "Projection onto EllipsoidSupportFunction is not yet implemented " "in ChambollePockSolver. Only BallSupportFunction is supported." ) else: raise NotImplementedError( f"ChambollePockSolver does not know how to project onto " f"{type(self._B).__name__}. Only BallSupportFunction is currently " "supported." ) def _proj_V(self, z: "Vector") -> "Vector": """Project $z$ onto the set $V$. Supports :class:`BallSupportFunction` (closed ball projection). Raises :class:`NotImplementedError` for other support function types. """ if isinstance(self._V, BallSupportFunction): center = self._V._center radius = self._V._radius H = self._data_space diff = H.subtract(z, center) dist = H.norm(diff) if dist <= radius: return z # proj_V(z) = c + r * (z - c) / ||z - c|| return H.add(center, H.multiply(radius / dist, diff)) elif isinstance(self._V, EllipsoidSupportFunction): raise NotImplementedError( "Projection onto EllipsoidSupportFunction is not yet implemented " "in ChambollePockSolver. Only BallSupportFunction is supported." ) else: raise NotImplementedError( f"ChambollePockSolver does not know how to project onto " f"{type(self._V).__name__}. Only BallSupportFunction is currently " "supported." )
[docs] def solve( self, c: "Vector", m0: "Optional[Vector]" = None, ) -> ChambollePockResult: r"""Run the Chambolle-Pock iterations for objective direction $c$. Solves .. math:: h_U = \max_{m \in B,\, v \in V}\; \langle c, m \rangle \quad\text{s.t.}\quad Gm + v = \tilde{d} Iteration (with $θ = 1$, $K = [G;\\ I]$): #. $μ^{n+1} = μ^n + σ(G\\bar{m}^n + \\bar{v}^n - \\tilde{d})$ #. $m^{n+1} = \\operatorname{proj}_B\\bigl(m^n - τ G^* μ^{n+1} + τ c\\bigr)$ #. $v^{n+1} = \\operatorname{proj}_V\\bigl(v^n - τ μ^{n+1}\\bigr)$ #. $\\bar{m}^{n+1} = m^{n+1} + θ(m^{n+1} - m^n)$, v_bar^{n+1} = v^{n+1} + theta * (v^{n+1} - v^n) Convergence is declared when the feasibility residual ||G @ m + v - d_tilde|| < tolerance. Args: c: Linear objective coefficient in model space (typically c = T* @ q). m0: Initial model vector. Defaults to zero. Returns: :class:`ChambollePockResult` containing the primal optimisers $(m^*, v^*)$, dual variable $μ^*$, feasibility gap, and convergence diagnostics. """ model_space = self._model_space data_space = self._data_space G = self._G d_tilde = self._d_tilde sigma = self._sigma tau = self._tau theta = self._theta m = m0 if m0 is not None else model_space.zero v = data_space.zero mu = data_space.zero m_bar = m v_bar = v for n in range(self._max_iterations): # --- Dual update -------------------------------------------------- # mu^{n+1} = mu^n + sigma * (G m_bar + v_bar - d_tilde) Gm_bar = G(m_bar) residual = data_space.subtract( data_space.add(Gm_bar, v_bar), d_tilde ) mu_new = data_space.add(mu, data_space.multiply(sigma, residual)) # --- Primal update m ---------------------------------------------- # m^{n+1} = proj_B(m - tau * G^* mu_new + tau * c) Gstar_mu = G.adjoint(mu_new) m_input = model_space.add( model_space.subtract(m, model_space.multiply(tau, Gstar_mu)), model_space.multiply(tau, c), ) m_new = self._proj_B(m_input) # --- Primal update v ---------------------------------------------- # v^{n+1} = proj_V(v - tau * mu_new) v_input = data_space.subtract(v, data_space.multiply(tau, mu_new)) v_new = self._proj_V(v_input) # --- Over-relaxation ---------------------------------------------- m_bar = model_space.add( m_new, model_space.multiply(theta, model_space.subtract(m_new, m)), ) v_bar = data_space.add( v_new, data_space.multiply(theta, data_space.subtract(v_new, v)), ) m, v, mu = m_new, v_new, mu_new # --- Convergence check ------------------------------------------- Gm_new = G(m) feas = data_space.norm( data_space.subtract(data_space.add(Gm_new, v), d_tilde) ) if feas < self._tolerance: return ChambollePockResult( m=m, v=v, mu=mu, primal_dual_gap=feas, converged=True, num_iterations=n + 1, ) # Maximum iterations reached Gm_final = G(m) feas_final = data_space.norm( data_space.subtract(data_space.add(Gm_final, v), d_tilde) ) return ChambollePockResult( m=m, v=v, mu=mu, primal_dual_gap=feas_final, converged=False, num_iterations=self._max_iterations, )
[docs] def solve_primal_feasibility( cost, qs: "list[Vector] | np.ndarray", cp_solver: ChambollePockSolver, ) -> np.ndarray: r"""Compute support values h_U(q_i) using the primal feasibility form. Solves one Chambolle-Pock problem for each direction q_i (using c = T* @ q_i), exploiting that the feasible set (B, V, G, d_tilde) is independent of q. The support value for direction q is: h_U(q) = max_{m in B, v in V} <T* @ q, m> subject to: G @ m + v = d_tilde = <T* @ q, m*(q)> where m*(q) is returned by :meth:`ChambollePockSolver.solve`. Args: cost: :class:`~pygeoinf.backus_gilbert.DualMasterCostFunction` holding references to T, G, and the model space. qs: Directions to evaluate; a list of Vectors (in the property space) or an ``np.ndarray`` of shape ``(p, prop_dim)``. cp_solver: Pre-configured :class:`ChambollePockSolver` for the problem. Returns: ``np.ndarray`` of shape ``(p,)`` with h_U(q_i) for each direction. """ model_space = cost._model_space T = cost._T values = [] for q in qs: c = T.adjoint(q) # c = T^* q (in model space) result = cp_solver.solve(c) h_value = model_space.inner_product(c, result.m) values.append(h_value) return np.array(values)
# --------------------------------------------------------------------------- # Smoothed Dual Master (Moreau-Yosida) # ---------------------------------------------------------------------------
[docs] class SmoothedDualMaster: """Smooth approximation of DualMasterCostFunction using Moreau-Yosida smoothing. Smooths the norm-type support functions with parameter epsilon, making the objective differentiable. Only supports :class:`BallSupportFunction` and :class:`EllipsoidSupportFunction`. The smoothed ball support is .. math:: σ_{B,\\varepsilon}(z) = ⟨ z, c ⟩ + r\\,\\sqrt{‖z‖^2 + \\varepsilon^2} and its gradient w.r.t. $z$ is .. math:: \\nabla_z σ_{B,\\varepsilon}(z) = c + r\\,\\frac{z}{\\sqrt{‖z‖^2 + \\varepsilon^2}} The smoothed ellipsoid support is .. math:: σ_{E,\\varepsilon}(z) = ⟨ z, c ⟩ + r\\,\\sqrt{⟨ z,\\, A^{-1}z ⟩ + \\varepsilon^2} and its gradient w.r.t. $z$ is .. math:: \\nabla_z σ_{E,\\varepsilon}(z) = c + r\\,\\frac{A^{-1}z}{\\sqrt{⟨ z,\\, A^{-1}z ⟩ + \\varepsilon^2}} The full smoothed objective and its gradient are .. math:: \\varphi_\\varepsilon(λ) = ⟨λ, \\tilde{d}⟩ + σ_{B,\\varepsilon}(T^*q - G^*λ) + σ_{V,\\varepsilon}(-λ) .. math:: \\nabla_λ \\varphi_\\varepsilon(λ) = \\tilde{d} - G\\,\\nabla_{z_1}σ_{B,\\varepsilon}(z_1) - \\nabla_{z_2}σ_{V,\\varepsilon}(z_2) where $z_1 = T^*q - G^*λ$ and $z_2 = -λ$. Args: cost: :class:`~pygeoinf.backus_gilbert.DualMasterCostFunction` instance. epsilon: Smoothing parameter ($> 0$). Smaller values give a better approximation but a larger Lipschitz constant $L = r‖G‖^2 / \\varepsilon$. Raises: NotImplementedError: If either support function is not a :class:`BallSupportFunction` or :class:`EllipsoidSupportFunction`. """ def __init__(self, cost: "object", epsilon: float) -> None: self._cost = cost if epsilon <= 0: raise ValueError("epsilon must be positive") self._epsilon = float(epsilon) # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _eval_support( self, z: "Vector", sigma: object ) -> "tuple[float, Vector]": """Dispatch to the appropriate smoothed value-and-gradient helper. Args: z: Argument vector in the primal domain of *sigma*. sigma: A support function (Ball or Ellipsoid). Returns: ``(value, grad)`` — smoothed scalar value and gradient w.r.t. *z*. Raises: NotImplementedError: For unsupported support function types. """ if isinstance(sigma, BallSupportFunction): return self._smoothed_ball_value_and_grad(z, sigma) if isinstance(sigma, EllipsoidSupportFunction): return self._smoothed_ellipsoid_value_and_grad(z, sigma) raise NotImplementedError( f"SmoothedDualMaster only supports BallSupportFunction and " f"EllipsoidSupportFunction; got {type(sigma).__name__}." ) def _smoothed_ball_value_and_grad( self, z: "Vector", sigma: "BallSupportFunction" ) -> "tuple[float, Vector]": """Smoothed value and gradient of a ball support function at *z*. For $σ_{B,\\varepsilon}(z) = ⟨ z, c ⟩ + r\\sqrt{‖z‖^2 + \\varepsilon^2}$: .. math:: \\nabla_z σ_{B,\\varepsilon}(z) = c + r\\,\\frac{z}{\\sqrt{‖z‖^2 + \\varepsilon^2}} Args: z: Argument vector in the primal domain of *sigma*. sigma: A :class:`BallSupportFunction`. Returns: ``(value, grad)`` — smoothed scalar value and gradient. """ H = sigma.primal_domain c = sigma._center r = sigma._radius eps = self._epsilon center_term = H.inner_product(z, c) z_norm_sq = H.inner_product(z, z) denom = np.sqrt(z_norm_sq + eps * eps) value = center_term + r * denom # grad = c + (r / denom) * z grad = H.add(c, H.multiply(r / denom, z)) return float(value), grad def _smoothed_ellipsoid_value_and_grad( self, z: "Vector", sigma: "EllipsoidSupportFunction" ) -> "tuple[float, Vector]": """Smoothed value and gradient of an ellipsoid support function at *z*. For $σ_{E,\\varepsilon}(z) = ⟨ z, c ⟩ + r\\sqrt{⟨ z, A^{-1}z ⟩ + \\varepsilon^2}$: .. math:: \\nabla_z σ_{E,\\varepsilon}(z) = c + r\\,\\frac{A^{-1}z}{\\sqrt{⟨ z, A^{-1}z ⟩ + \\varepsilon^2}} Args: z: Argument vector in the primal domain of *sigma*. sigma: A :class:`EllipsoidSupportFunction`. Returns: ``(value, grad)`` — smoothed scalar value and gradient. Raises: NotImplementedError: If ``sigma._A_inv`` is ``None``. """ if sigma._A_inv is None: raise NotImplementedError( "EllipsoidSupportFunction requires inverse_operator to be set " "for use with SmoothedDualMaster." ) H = sigma.primal_domain c = sigma._center r = sigma._radius eps = self._epsilon A_inv_z = sigma._A_inv(z) center_term = H.inner_product(z, c) inner_term = H.inner_product(z, A_inv_z) # <z, A^{-1}z> denom = np.sqrt(max(inner_term, 0.0) + eps * eps) value = center_term + r * denom # grad = c + (r / denom) * A^{-1}z grad = H.add(c, H.multiply(r / denom, A_inv_z)) return float(value), grad # ------------------------------------------------------------------ # Public interface # ------------------------------------------------------------------ def __call__(self, lam: "Vector") -> float: """Evaluate the smoothed objective $\\varphi_\\varepsilon(λ)$. Args: lam: Dual variable $λ ∈ D$. Returns: Smoothed objective value (float). Raises: NotImplementedError: If either support function is unsupported. """ cost = self._cost domain = cost.domain model_space = cost._model_space term1 = domain.inner_product(lam, cost._observed_data) Gstar_lam = cost._G.adjoint(lam) z1 = model_space.subtract(cost._Tstar_q, Gstar_lam) z2 = domain.negative(lam) term2, _ = self._eval_support(z1, cost._model_prior_support) term3, _ = self._eval_support(z2, cost._data_error_support) return term1 + term2 + term3
[docs] def gradient(self, lam: "Vector") -> "Vector": """Compute the gradient $\\nabla_λ \\varphi_\\varepsilon(λ)$. Uses the chain rule through $z_1 = T^*q - G^*λ$ and $z_2 = -λ$: .. math:: \\nabla_λ \\varphi_\\varepsilon = \\tilde{d} - G\\,\\nabla_{z_1}σ_{B,\\varepsilon}(z_1) - \\nabla_{z_2}σ_{V,\\varepsilon}(z_2) Args: lam: Dual variable $λ ∈ D$. Returns: Gradient vector in $D$. Raises: NotImplementedError: If either support function is unsupported. """ cost = self._cost domain = cost.domain model_space = cost._model_space Gstar_lam = cost._G.adjoint(lam) z1 = model_space.subtract(cost._Tstar_q, Gstar_lam) z2 = domain.negative(lam) _, grad_z1 = self._eval_support(z1, cost._model_prior_support) _, grad_z2 = self._eval_support(z2, cost._data_error_support) # Contribution from σ_B(T*q - G*λ): chain rule gives -G * grad_z1 term2_grad = domain.negative(cost._G(grad_z1)) # Contribution from σ_V(-λ): chain rule gives -grad_z2 term3_grad = domain.negative(grad_z2) return domain.add( cost._observed_data, domain.add(term2_grad, term3_grad), )
# --------------------------------------------------------------------------- # Smoothed L-BFGS-B Solver # ---------------------------------------------------------------------------
[docs] class SmoothedLBFGSSolver: """L-BFGS-B optimiser with smoothing continuation for DualMasterCostFunction. Uses Moreau-Yosida smoothing with a geometric continuation schedule: epsilon_0 >> epsilon_1 >> ... >> epsilon_{L-1} ≈ tol where epsilon_i = epsilon_0 × 10^{-i}. Each level is solved with L-BFGS-B, warm-starting from the previous solution. Note: Returns :class:`BundleResult` with ``gap=np.nan`` and ``f_low=np.nan`` — no gap certificate is available for smoothed methods. Args: cost: :class:`~pygeoinf.backus_gilbert.DualMasterCostFunction` instance. epsilon0: Initial smoothing parameter (default ``1e-2``). n_levels: Number of continuation levels (default ``5``). tolerance: Target accuracy; last epsilon is $\\varepsilon_0 × 10^{-(n\\_levels - 1)}$. max_iter_per_level: Maximum L-BFGS-B iterations per level (default ``500``). """ def __init__( self, cost: "object", /, *, epsilon0: float = 1e-2, n_levels: int = 5, tolerance: float = 1e-6, max_iter_per_level: int = 500, ) -> None: self._oracle = cost self._epsilon0 = float(epsilon0) self._n_levels = int(n_levels) self._tolerance = float(tolerance) self._max_iter_per_level = int(max_iter_per_level)
[docs] def solve(self, lam0: "Vector") -> BundleResult: """Run the smoothed L-BFGS-B continuation and return the result. Args: lam0: Starting point $λ_0 ∈ D$. Returns: :class:`BundleResult` with ``gap`` and ``f_low`` set to ``np.nan`` (no subgradient-based lower bound is maintained). """ cost = self._oracle domain = cost.domain lam_current = lam0 total_iters = 0 function_values: List[float] = [] eps_schedule = [ self._epsilon0 * (10.0 ** (-i)) for i in range(self._n_levels) ] scipy_result = None # will hold the last scipy OptimizeResult smoothed = None for eps in eps_schedule: smoothed = SmoothedDualMaster(cost, eps) x0_np = domain.to_components(lam_current) def f_and_g(x_np: np.ndarray, _sm=smoothed) -> "tuple[float, np.ndarray]": lam = domain.from_components(x_np) f = _sm(lam) g = _sm.gradient(lam) g_np = domain.to_components(g) return float(f), g_np scipy_result = minimize( f_and_g, x0_np, method="L-BFGS-B", jac=True, options={ "maxiter": self._max_iter_per_level, "ftol": 1e-15, "gtol": 1e-8, }, ) total_iters += scipy_result.nit function_values.append(float(scipy_result.fun)) lam_current = domain.from_components(scipy_result.x) # Evaluate at the final iterate using the finest smoothing level f_best = float(smoothed(lam_current)) return BundleResult( x_best=lam_current, f_best=f_best, f_low=float("nan"), gap=float("nan"), converged=scipy_result.success if scipy_result is not None else False, num_iterations=total_iters, num_serious_steps=0, function_values=function_values, iterates=None, )
# --------------------------------------------------------------------------- # PrimalKKTSolver # ---------------------------------------------------------------------------
[docs] class PrimalKKTSolver: r"""Primal KKT solver via Woodbury identity in abstract Hilbert spaces. The solver operates on abstract Hilbert-space vectors throughout: the model space $H$ is **never** discretised. Only the data space $D$ (which is explicitly finite-dimensional) is discretised — and only inside :meth:`_woodbury_solve`, which builds the $M \times M$ Woodbury system. The Woodbury reduction: .. math:: u^*(\lambda,\mu) = \frac{1}{\lambda} A_B^{-1} r_H - \frac{1}{\lambda} A_B^{-1} G^* K^{-1} G A_B^{-1} r_H where .. math:: r_H = c + \lambda A_B u_0 + \mu G^* A_V \tilde{d}, \qquad K = \tfrac{1}{\mu} A_V^{-1} + \tfrac{1}{\lambda} G A_B^{-1} G^* The $M \times M$ matrix $P = G A_B^{-1} G^*$ is precomputed once via ``.matrix(dense=True)`` which probes the **data space** only. No model-space ``to_components`` or ``from_components`` is ever called. **Ball/ball simplification** ($A_B = I_H$, $A_V = I_D$): .. math:: r_H = c + \lambda u_0 + \mu G^* \tilde{d}, \qquad P = G G^*, \qquad K = \tfrac{1}{\mu} I_D + \tfrac{1}{\lambda} G G^* so the only M×M solve is $K z = G w$ and no model-space matrix is ever formed. Supports :class:`~pygeoinf.convex_analysis.BallSupportFunction` and :class:`~pygeoinf.convex_analysis.EllipsoidSupportFunction` for both $B$ and $V$ (four combinations). Args: B: Model prior support function. V: Data error support function (ball or ellipsoid, centered at origin). G: Forward operator $G : H \to D$. d_tilde: Observed data vector in $D$. fsolve_tol: Tolerance for ``scipy.optimize.fsolve``. fsolve_maxfev: Maximum function evaluations for ``fsolve``. """ def __init__( self, B: "SupportFunction", V: "SupportFunction", G: "LinearOperator", d_tilde: "Vector", /, *, fsolve_tol: float = 1e-10, fsolve_maxfev: int = 200, ) -> None: # --- Validate B --- if isinstance(B, BallSupportFunction): self._prior_is_ball = True self._A_B_op = B.primal_domain.identity_operator() self._A_B_inv_op = B.primal_domain.identity_operator() elif isinstance(B, EllipsoidSupportFunction): if B._A_inv is None: raise ValueError( "EllipsoidSupportFunction B must supply inverse_operator " "(A^{-1}) to be used as a PrimalKKTSolver prior." ) self._prior_is_ball = False self._A_B_op = B._A self._A_B_inv_op = B._A_inv else: raise TypeError( f"B must be BallSupportFunction or EllipsoidSupportFunction, " f"got {type(B).__name__}" ) # --- Validate V --- if isinstance(V, BallSupportFunction): self._data_is_ball = True self._A_V_op = V.primal_domain.identity_operator() elif isinstance(V, EllipsoidSupportFunction): if V._A_inv is None: raise ValueError( "EllipsoidSupportFunction V must supply inverse_operator " "(A^{-1}) to be used as a PrimalKKTSolver data set." ) self._data_is_ball = False self._A_V_op = V._A else: raise TypeError( f"V must be BallSupportFunction or EllipsoidSupportFunction, " f"got {type(V).__name__}" ) self._B = B self._V = V self._G = G self._d_tilde = d_tilde self._fsolve_tol = fsolve_tol self._fsolve_maxfev = fsolve_maxfev self._model_space = B.primal_domain self._data_space = G.codomain M = self._data_space.dim self._M = M self._eta = float(B._radius) self._r = float(V._radius) self._u0 = B._center # --- G_adj_AV operator (G.adjoint ∘ A_V : D → H) --- # For ball V: A_V = I_D, so G_adj_AV = G.adjoint # For ellipsoid V: G_adj_AV = G.adjoint @ V._A (LinearOperator composition) if self._data_is_ball: self._G_adj_AV = G.adjoint else: self._G_adj_AV = G.adjoint @ V._A # --- Precomputed H-vectors --- self._A_B_u0 = self._A_B_op(self._u0) # H-vector self._G_adj_AV_d = self._G_adj_AV(d_tilde) # H-vector # --- A_V^{-1} matrix (M×M, data space only) --- if self._data_is_ball: self._AV_inv_mat = np.eye(M) else: self._AV_inv_mat = V._A_inv.matrix(dense=True) # --- P_mat = G A_B^{-1} G* (M×M, data space probing only!) --- if self._prior_is_ball: self._P_mat = (G @ G.adjoint).matrix(dense=True) else: self._P_mat = (G @ B._A_inv @ G.adjoint).matrix(dense=True) # --- Warm-start state --- self._lambda_prev: float = 1.0 self._mu_prev: float = 0.0 self._has_warm_start: bool = False def _woodbury_solve(self, lam: float, mu: float, c: "Vector") -> "Vector": r"""Return $u^*(\lambda, \mu)$ entirely in abstract Hilbert-space vectors. Solves .. math:: (\lambda A_B + \mu G^* A_V G)\, u^* = c + \lambda A_B u_0 + \mu G^* A_V \tilde{d} using the Woodbury identity. The only data-space discretisation is the $M \times M$ Cholesky solve for $K z = G w$. Args: lam: KKT multiplier $\lambda > 0$. mu: KKT multiplier $\mu \geq 0$. c: Objective direction (primal $H$-vector). Returns: $u^*$ as an abstract $H$-vector. """ from scipy.linalg import cho_factor, cho_solve ms = self._model_space ds = self._data_space # rhs = c + λ A_B(u0) + μ G_adj_AV(d̃) [abstract H] rhs_H = ms.add( c, ms.add( ms.multiply(lam, self._A_B_u0), ms.multiply(mu, self._G_adj_AV_d), ), ) # w = (1/λ) A_B_inv(rhs) [abstract H] w_H = ms.multiply(1.0 / lam, self._A_B_inv_op(rhs_H)) if mu == 0.0: return w_H # p = G(w) [data-space vector] p_D = self._G(w_H) p_comps = ds.to_components(p_D) # K = (1/μ) A_V^{-1} + (1/λ) P_mat [M×M] K_mat = (1.0 / mu) * self._AV_inv_mat + (1.0 / lam) * self._P_mat cho_decomp = cho_factor(K_mat) z_comps = cho_solve(cho_decomp, p_comps) z_D = ds.from_components(z_comps) # correction = (1/λ) A_B_inv(G*(z)) [abstract H] # NOTE: uses G.adjoint (plain adjoint), NOT G_adj_AV correction_H = ms.multiply( 1.0 / lam, self._A_B_inv_op(self._G.adjoint(z_D)) ) return ms.subtract(w_H, correction_H) def _residuals( self, lam: float, mu: float, c: "Vector" ) -> "tuple[float, float]": r"""KKT residuals $(\rho_1, \rho_2)$ at $(\lambda, \mu)$. .. math:: \rho_1 = \langle A_B(u^* - u_0),\, u^* - u_0 \rangle_H - \eta^2 \rho_2 = \langle A_V(G u^* - \tilde{d}),\, G u^* - \tilde{d} \rangle_D - r^2 Args: lam: $\lambda > 0$. mu: $\mu \geq 0$. c: Objective direction. Returns: ``(rho1, rho2)`` floats. """ ms = self._model_space ds = self._data_space u = self._woodbury_solve(lam, mu, c) diff_u = ms.subtract(u, self._u0) # For ball B: A_B_op = identity, so <diff_u, diff_u>_H = ||u* - u0||_H^2 # For ellipsoid B: <A_B(diff_u), diff_u>_H rho1 = float(ms.inner_product(self._A_B_op(diff_u), diff_u)) - self._eta ** 2 res_d = ds.subtract(self._G(u), self._d_tilde) rho2 = float(ds.inner_product(self._A_V_op(res_d), res_d)) - self._r ** 2 return rho1, rho2
[docs] def solve(self, c: "Vector") -> KKTResult: r"""Solve for $u^*$ maximising $\langle c, u \rangle$ over the feasible set. Uses a two-branch strategy: 1. Compute the support point $u_{\rm ball}$ of $B$ in direction $c$. If the data constraint is satisfied, return immediately. 2. Otherwise both constraints are active; solve the $2 \times 2$ root-finding problem in log-space via ``scipy.optimize.fsolve``, warm-started from $(\lambda_{\rm prev}, \mu_{\rm prev})$. The model space is **never** discretised during this method. Args: c: Linear objective in model space $H$. Returns: :class:`KKTResult` with the optimal model vector $u^*$, KKT multipliers, convergence flag, and iteration count. """ from scipy.optimize import fsolve ms = self._model_space ds = self._data_space # ------------------------------------------------------------------ # Branch 1: prior-only (data constraint not active) # ------------------------------------------------------------------ _, u_ball = self._B.value_and_support_point(c) if u_ball is None: u_ball = self._B.support_point(c) if u_ball is not None: res_data = ds.subtract(self._G(u_ball), self._d_tilde) data_sq = float(ds.inner_product(self._A_V_op(res_data), res_data)) if data_sq <= self._r ** 2 * (1.0 + 1e-9): # Compute λ* from the active prior constraint if self._prior_is_ball: c_norm = float(ms.norm(c)) lam_star = c_norm / self._eta if c_norm > 1e-14 else 1.0 else: c_norm_AB = float( np.sqrt(max(float(ms.inner_product(self._B._A_inv(c), c)), 0.0)) ) lam_star = c_norm_AB / self._eta if c_norm_AB > 1e-14 else 1.0 return KKTResult( m=u_ball, multipliers=(lam_star, 0.0), converged=True, num_iterations=1, ) # ------------------------------------------------------------------ # Branch 2: both constraints active — 2D fsolve in log space # ------------------------------------------------------------------ if self._prior_is_ball: c_norm = float(ms.norm(c)) lam_phys = max(c_norm / self._eta, 1e-4) else: c_norm_AB = float( np.sqrt(max(float(ms.inner_product(self._B._A_inv(c), c)), 0.0)) ) lam_phys = max(c_norm_AB / self._eta, 1e-4) if self._has_warm_start and self._lambda_prev > 1e-4: lam0 = self._lambda_prev else: lam0 = lam_phys if self._has_warm_start and self._mu_prev > 1e-8: mu0 = self._mu_prev else: mu0 = 1e-3 def _residual_log(log_x: np.ndarray) -> np.ndarray: log_x_safe = np.clip(log_x, -30.0, 25.0) lam = float(np.exp(log_x_safe[0])) mu = float(np.exp(log_x_safe[1])) r1, r2 = self._residuals(lam, mu, c) return np.array([r1, r2]) x0_log = np.array([np.log(lam0), np.log(mu0)]) sol, info, ier, _mesg = fsolve( _residual_log, x0_log, full_output=True, xtol=self._fsolve_tol, maxfev=self._fsolve_maxfev, ) if ier != 1: fallback_guesses = [ (lam_phys, 1e-3), (lam_phys, 1e-2), (lam_phys, 0.5), (lam_phys, 2.0), (0.5 * lam_phys, 1e-2), (0.5 * lam_phys, 1.0), ] for alt_lam, alt_mu in fallback_guesses: alt_x0 = np.array([ np.log(max(alt_lam, 1e-4)), np.log(max(alt_mu, 1e-8)), ]) alt_sol, alt_info, alt_ier, _ = fsolve( _residual_log, alt_x0, full_output=True, xtol=self._fsolve_tol, maxfev=self._fsolve_maxfev, ) if alt_ier == 1: sol, info, ier = alt_sol, alt_info, alt_ier break sol_safe = np.clip(sol, -30.0, 25.0) lam_sol = float(np.exp(sol_safe[0])) mu_sol = float(np.exp(sol_safe[1])) converged = ier == 1 u_star = self._woodbury_solve(lam_sol, mu_sol, c) self._lambda_prev = lam_sol self._mu_prev = mu_sol self._has_warm_start = converged return KKTResult( m=u_star, multipliers=(lam_sol, mu_sol), converged=converged, num_iterations=int(info["nfev"]), )