"""
Provides concrete implementations of function spaces on the two-sphere (S²).
This module uses the abstract framework from the symmetric space module to create
fully-featured `Lebesgue` (L²) and `Sobolev` (Hˢ) Hilbert spaces for functions
defined on the surface of a sphere.
It utilizes the `pyshtools` library for highly efficient and accurate spherical
harmonic transforms. Following a compositional design, this module first
defines a base `Lebesgue` space and then constructs the `Sobolev` space as a
`MassWeightedHilbertSpace` over it. The module also includes powerful plotting
utilities built on `cartopy` for professional-quality geospatial visualization.
"""
from __future__ import annotations
from typing import Callable, Any, List, Optional, Tuple, TYPE_CHECKING, Union
import math
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import numpy as np
from joblib import Parallel, delayed
from scipy.spatial import cKDTree
import cartopy.io.shapereader as shpreader
import shapely.geometry as sgeom
from shapely.prepared import prep
try:
import pyshtools as sh
from pyshtools.shio import SHCilmToVector
import cartopy.crs as ccrs
import cartopy.feature as cfeature
from cartopy.mpl.ticker import LongitudeFormatter, LatitudeFormatter
from cartopy.mpl.geoaxes import GeoAxes
except ImportError:
raise ImportError(
"pyshtools and cartopy are required for the sphere module. "
"Please install them with 'pip install pygeoinf[sphere]'"
) from None
from pygeoinf.hilbert_space import EuclideanSpace
from pygeoinf.linear_forms import LinearForm
from pygeoinf.linear_operators import LinearOperator
from pygeoinf.datasets import load_gsn_stations
from .symmetric_space import AbstractSymmetricLebesgueSpace, SymmetricSobolevSpace
if TYPE_CHECKING:
from cartopy.crs import Projection
from pyshtools import SHGrid
[docs]
class Lebesgue(AbstractSymmetricLebesgueSpace):
"""
Implementation of the Lebesgue space L² on a sphere.
This class represents square-integrable functions on a sphere. A function is
represented by its values on an evenly spaced grid. The co-ordinate basis for
the space is through spherical harmonic expansions.
"""
def __init__(
self,
lmax: int,
/,
*,
radius: float = 1,
grid: str = "DH",
extend: bool = True,
):
"""
Args:
lmax: Maximum degree for the expansions.
radius: Radius of the sphere. Defaults to 1.
grid: pyshtools grid type. Defaults to "DH"
extend: If true longitudes wrap fully. Defaults to True.
"""
if lmax < 0:
raise ValueError("lmax must be non-negative")
self._lmax: int = lmax
self._radius: float = radius
if grid == "DH2":
self._grid = "DH"
self._sampling = 2
else:
self._grid = grid
self._sampling = 1
self._extend: bool = extend
# SH coefficient options fixed internally
self._normalization: str = "ortho"
self._csphase: int = 1
AbstractSymmetricLebesgueSpace.__init__(self, 2, lmax, (lmax + 1) ** 2, False)
[docs]
@classmethod
def from_covariance(
cls,
covariance_function: Callable[[float], float],
/,
*,
radius: float = 1.0,
grid: str = "DH",
extend: bool = True,
rtol: float = 1e-6,
min_degree: int = 0,
max_degree: Optional[int] = None,
power_of_two: bool = False,
) -> Lebesgue:
"""
Creates an instance of the L² space with a spherical harmonic truncation degree (`lmax`)
automatically chosen to capture the expected energy of functions drawn from
a specified prior measure.
This factory method calculates the expected squared norm (energy) of a random field
whose spectral variances are defined by the provided `covariance_function`. It iteratively
adds higher spherical harmonic degrees until the relative contribution of the next degree
drops below the specified relative tolerance.
Args:
covariance_function: A callable mapping a Laplacian eigenvalue to its spectral variance.
radius: The radius of the sphere. Defaults to 1.0.
grid: The pyshtools spatial grid format to use. Defaults to "DH".
extend: If True, the spatial grid includes both 0 and 360-degree longitudes.
rtol: The relative tolerance for the energy truncation. Defaults to 1e-6.
min_degree: The absolute minimum truncation degree to return. Defaults to 0.
max_degree: An optional safety ceiling for the truncation degree. If convergence
is not reached by this degree, the search terminates and returns this value.
power_of_two: If True, rounds the resulting `lmax` up to the nearest power of two.
Returns:
Lebesgue: A fully instantiated L² space on the sphere with the optimal `lmax`.
Raises:
RuntimeError: If the energy sequence fails to converge within 100,000 iterations
and no `max_degree` is specified.
"""
dummy_space = cls(max(1, min_degree), radius=radius, grid=grid, extend=extend)
optimal_degree = dummy_space.estimate_truncation_degree(
covariance_function, rtol=rtol, min_degree=min_degree, max_degree=max_degree
)
if power_of_two:
n = int(np.log2(optimal_degree))
optimal_degree = 2 ** (n + 1)
return cls(optimal_degree, radius=radius, grid=grid, extend=extend)
[docs]
@classmethod
def from_heat_kernel_prior(
cls,
kernel_scale: float,
/,
*,
radius: float = 1.0,
grid: str = "DH",
extend: bool = True,
rtol: float = 1e-6,
min_degree: int = 0,
max_degree: Optional[int] = None,
power_of_two: bool = False,
) -> Lebesgue:
"""
Creates an instance of the L² space on the sphere, tuned to the expected
energy of a Heat Kernel prior measure.
Args:
kernel_scale: The length-scale parameter of the heat kernel covariance.
radius: The radius of the sphere. Defaults to 1.0.
grid: The pyshtools spatial grid format to use. Defaults to "DH".
extend: If True, the spatial grid includes both 0 and 360-degree longitudes.
rtol: The relative tolerance for the energy truncation. Defaults to 1e-6.
min_degree: The absolute minimum truncation degree to return. Defaults to 0.
max_degree: An optional safety ceiling for the truncation degree.
power_of_two: If True, rounds the resulting `lmax` up to the nearest power of two.
"""
return cls.from_covariance(
cls.heat_kernel(kernel_scale),
radius=radius,
grid=grid,
extend=extend,
rtol=rtol,
min_degree=min_degree,
max_degree=max_degree,
power_of_two=power_of_two,
)
[docs]
@classmethod
def from_sobolev_kernel_prior(
cls,
kernel_order: float,
kernel_scale: float,
/,
*,
radius: float = 1.0,
grid: str = "DH",
extend: bool = True,
rtol: float = 1e-6,
min_degree: int = 0,
max_degree: Optional[int] = None,
power_of_two: bool = False,
) -> Lebesgue:
"""
Creates an instance of the L² space on the sphere, tuned to the expected
energy of a Sobolev-type prior measure.
Args:
kernel_order: The smoothness order of the Sobolev prior measure.
kernel_scale: The length-scale parameter of the Sobolev prior measure.
radius: The radius of the sphere. Defaults to 1.0.
grid: The pyshtools spatial grid format to use. Defaults to "DH".
extend: If True, the spatial grid includes both 0 and 360-degree longitudes.
rtol: The relative tolerance for the energy truncation. Defaults to 1e-6.
min_degree: The absolute minimum truncation degree to return. Defaults to 0.
max_degree: An optional safety ceiling for the truncation degree.
power_of_two: If True, rounds the resulting `lmax` up to the nearest power of two.
"""
return cls.from_covariance(
cls.sobolev_kernel(kernel_order, kernel_scale),
radius=radius,
grid=grid,
extend=extend,
rtol=rtol,
min_degree=min_degree,
max_degree=max_degree,
power_of_two=power_of_two,
)
# ------------------------------------------------------ #
# Properties #
# ------------------------------------------------------ #
@property
def lmax(self) -> int:
"""The maximum spherical harmonic truncation degree."""
return self._lmax
@property
def radius(self) -> float:
"""The radius of the sphere."""
return self._radius
@property
def grid(self) -> str:
"""The `pyshtools` grid type used for spatial representations."""
return self._grid
@property
def sampling(self) -> int:
"""The sampling factor used for spatial representations."""
return self._sampling
@property
def extend(self) -> bool:
"""True if the spatial grid includes both 0 and 360-degree longitudes."""
return self._extend
@property
def normalization(self) -> str:
"""The spherical harmonic normalization convention used ('ortho')."""
return self._normalization
@property
def csphase(self) -> int:
"""The Condon-Shortley phase convention used (1)."""
return self._csphase
@property
def grid_type(self) -> str:
"""
Returns the pyshtools grid type.
"""
return self.grid if self._sampling == 1 else "DH2"
@property
def gaussian_curvature(self) -> float:
return 1.0 / (self.radius**2)
# ------------------------------------------------------ #
# Public methods #
# ------------------------------------------------------ #
[docs]
def project_function(self, f: Callable[[(float, float)], float]) -> np.ndarray:
"""
Returns an element of the space by projecting a given function.
Args:
f: A function that takes a point `(lat, lon)` and returns a value.
"""
u = sh.SHGrid.from_zeros(
self.lmax, grid=self.grid, extend=self.extend, sampling=self._sampling
)
for j, lon in enumerate(u.lons()):
for i, lat in enumerate(u.lats()):
u.data[i, j] = f((lat, lon))
return u
[docs]
def to_coefficients(self, u: sh.SHGrid) -> sh.SHCoeffs:
"""Maps a function vector to its spherical harmonic coefficients."""
return u.expand(normalization=self.normalization, csphase=self.csphase)
[docs]
def from_coefficients(self, ulm: sh.SHCoeffs) -> sh.SHGrid:
"""Maps spherical harmonic coefficients to a function vector."""
grid = self.grid if self._sampling == 1 else "DH2"
return ulm.expand(grid=grid, extend=self.extend)
[docs]
def sample_power_measure(
self,
measure,
n_samples,
/,
*,
lmin=None,
lmax=None,
parallel: bool = False,
n_jobs: int = -1,
):
"""
Takes in a Gaussian measure on the space, draws n_samples from
and returns samples for the spherical harmonic power at degrees in
the indicated range.
"""
lmin = 0 if lmin is None else lmin
lmax = self.lmax if lmax is None else min(self.lmax, lmax)
samples = measure.samples(n_samples, parallel=parallel, n_jobs=n_jobs)
powers = []
for u in samples:
ulm = self.to_coefficients(u)
powers.append(ulm.spectrum(lmax=lmax, convention="power")[lmin:])
return powers
[docs]
@staticmethod
def iris_stations(n_stations: int = None, include_names: bool = False):
"""
Convenience method to retrieve a globally distributed set of
Global Seismograph Network (GSN) stations from IRIS.
Args:
n_stations: The number of random stations to return. If None,
returns the entire available network (~177 stations).
include_names: If True, returns (Name, Latitude, Longitude).
If False, returns (Latitude, Longitude) tuples.
Returns:
A list of tuples representing the station coordinates in degrees.
"""
from pygeoinf.datasets import load_gsn_stations
return load_gsn_stations(n_stations=n_stations, include_names=include_names)
[docs]
@staticmethod
def random_earthquakes(n_points: int, min_magnitude: float = 5.0):
"""
Returns a random sample of real, sensible earthquake locations.
This provides a highly clustered, non-uniform spatial distribution
along tectonic boundaries, perfect for testing robust spatial operators.
Args:
n_points: The exact number of locations to return.
min_magnitude: The minimum magnitude threshold for the catalog.
Returns:
A list of (Latitude, Longitude) tuples in degrees.
"""
from pygeoinf.datasets import sample_earthquakes
# The FDSN API returns (lat, lon, depth). We strip the depth
# since we are operating on the 2D surface of the sphere.
events = sample_earthquakes(n_points, min_magnitude=min_magnitude)
return [(lat, lon) for lat, lon, depth in events]
# ------------------------------------------------------ #
# Methods for SymmetricHilbertSpace #
# ------------------------------------------------------ #
[docs]
def index_to_integer(self, k: Tuple[int, int]) -> int:
l, m = k
if abs(m) > l or l < 0:
raise ValueError("Invalid spherical harmonic: |m| must be <= l, and l >= 0")
# Pure Python is much faster for scalars
return l**2 + m if m >= 0 else l**2 + l + abs(m)
[docs]
def integer_to_index(self, i: int) -> Tuple[int, int]:
if i < 0:
raise ValueError("Index cannot be negative.")
# math.isqrt is vastly faster than np.floor(np.sqrt(i))
l = math.isqrt(i)
r = i - l**2
m = r if r <= l else l - r
return l, m
[docs]
def laplacian_eigenvalue(self, k: Tuple[int, int]) -> float:
"""
Returns the (l.m)-th eigenvalue of the Laplacian.
Args:
k = (l,m): The index of the eigenvalue to return.
"""
l = k[0]
return l * (l + 1) / self.radius**2
[docs]
def laplacian_eigenvector_squared_norm(self, k: Tuple[int, int]) -> float:
return self.radius**2
[docs]
def laplacian_eigenvectors_at_point(self, point: Tuple[float, float]) -> np.ndarray:
latitude, longitude = point
colatitude = 90.0 - latitude
coeffs = sh.expand.spharm(
self.lmax,
colatitude,
longitude,
normalization=self.normalization,
degrees=True,
)
return SHCilmToVector(coeffs)
[docs]
def random_point(self) -> Tuple[float, float]:
"""Returns a random point as `[latitude, longitude]`."""
latitude = np.rad2deg(np.arcsin(np.random.uniform(-1.0, 1.0)))
longitude = np.random.uniform(0.0, 360.0)
return (latitude, longitude)
[docs]
def geodesic_distance(
self, p1: Tuple[float, float], p2: Tuple[float, float]
) -> float:
"""Returns the great-circle distance between two points on the sphere."""
v1, v2 = self._to_vector(*p1), self._to_vector(*p2)
dot_product = np.clip(np.dot(v1, v2), -1.0, 1.0)
omega = np.arccos(dot_product)
return float(self.radius * omega)
[docs]
def geodesic_quadrature(
self, p1: Tuple[float, float], p2: Tuple[float, float], n_points: int
) -> Tuple[List[Tuple[float, float]], np.ndarray]:
"""Generates Gauss-Legendre quadrature points and weights along a great-circle arc."""
arc_length = self.geodesic_distance(p1, p2)
omega = arc_length / self.radius
if omega < 1e-10:
return [p1] * n_points, np.zeros(n_points)
if np.abs(omega - np.pi) < 1e-10:
raise ValueError(
"Points are antipodal; the great circle path is not unique."
)
v1, v2 = self._to_vector(*p1), self._to_vector(*p2)
x, w = np.polynomial.legendre.leggauss(n_points)
t_vals = (x + 1) / 2.0
scaled_weights = w * (arc_length / 2.0)
sin_omega = np.sin(omega)
points = []
for t in t_vals:
coeff1 = np.sin((1 - t) * omega) / sin_omega
coeff2 = np.sin(t * omega) / sin_omega
v_interp = coeff1 * v1 + coeff2 * v2
points.append(self._to_latlon(v_interp))
return points, scaled_weights
[docs]
def geodesic_ball_quadrature(
self, center: Tuple[float, float], radius: float, n_points: int
) -> Tuple[List[Tuple[float, float]], np.ndarray]:
r"""Return a deterministic quadrature rule for a spherical cap.
The rule uses Gauss-Legendre nodes in the polar variable
$u = \cos(\gamma)$ and a uniform trapezoidal rule in azimuth on each
radial ring. The weights include the sphere area element, so their sum
equals the physical cap area.
"""
if n_points <= 0:
raise ValueError("Geodesic-ball quadrature requires at least one point.")
if radius < 0.0 or radius > np.pi * self.radius:
raise ValueError(
"Geodesic-ball radius on the sphere must lie in [0, pi * radius]."
)
if radius <= 0.0:
return [center] * n_points, np.zeros(n_points)
angular_radius = radius / self.radius
cos_alpha = np.cos(angular_radius)
n_radial = min(n_points, max(1, int(np.sqrt(n_points))))
radial_nodes, radial_weights = np.polynomial.legendre.leggauss(n_radial)
interval_half_width = 0.5 * (1.0 - cos_alpha)
u_nodes = interval_half_width * (radial_nodes + 1.0) + cos_alpha
u_weights = interval_half_width * radial_weights
ring_radii = np.sqrt(np.clip(1.0 - u_nodes**2, 0.0, None))
ring_counts = self._distribute_ring_point_counts(n_points, ring_radii)
center_vector = self._to_vector(*center)
tangent_1, tangent_2 = self._tangent_frame(center_vector)
points: List[Tuple[float, float]] = []
weights: List[float] = []
for u_value, u_weight, ring_radius, ring_count in zip(
u_nodes, u_weights, ring_radii, ring_counts
):
azimuths = 2.0 * np.pi * np.arange(ring_count, dtype=float) / ring_count
ring_vectors = u_value * center_vector[None, :] + ring_radius * (
np.cos(azimuths)[:, None] * tangent_1[None, :]
+ np.sin(azimuths)[:, None] * tangent_2[None, :]
)
ring_vectors /= np.linalg.norm(ring_vectors, axis=1, keepdims=True)
points.extend(self._to_latlon(vec) for vec in ring_vectors)
point_weight = self.radius**2 * (2.0 * np.pi * u_weight / ring_count)
weights.extend([point_weight] * ring_count)
return points, np.asarray(weights)
[docs]
def spherical_cap_integral(
self,
center: Tuple[float, float],
angular_radius: float,
/,
*,
normalize: bool = False,
) -> LinearForm:
r"""Return the exact spherical-cap integral functional.
The returned form represents
$\int_C u(x)\,dS(x)$, where ``C`` is the spherical cap centered at
``center`` with angular radius ``angular_radius``. If ``normalize`` is
true, the returned form represents the cap average
$|C|^{-1}\int_C u(x)\,dS(x)$.
The implementation uses ``pyshtools.SHCoeffs.from_cap`` to compute the
spherical-harmonic coefficients of the cap indicator exactly in the
truncated basis. The pyshtools cap is normalized to global average one,
so it is rescaled to either the physical cap indicator or normalized
cap average.
Args:
center: ``(latitude, longitude)`` cap center in degrees.
angular_radius: Cap radius in radians on the unit sphere.
normalize: If true, return the cap average instead of the raw
surface integral.
"""
if angular_radius < 0.0 or angular_radius > np.pi:
raise ValueError("Spherical cap angular radius must lie in [0, pi].")
cap_area_unit_sphere = 2.0 * np.pi * (1.0 - np.cos(angular_radius))
if cap_area_unit_sphere <= 0.0:
if normalize:
raise ValueError("Cannot normalize a zero-area spherical cap.")
return LinearForm(self, components=np.zeros(self.dim))
cap_coefficients = sh.SHCoeffs.from_cap(
np.degrees(angular_radius),
self.lmax,
clat=center[0],
clon=center[1],
normalization=self.normalization,
csphase=self.csphase,
kind="real",
degrees=True,
)
components = self._coefficient_to_component(cap_coefficients)
if normalize:
components = components / (4.0 * np.pi)
else:
components = (
components * self.radius**2 * cap_area_unit_sphere / (4.0 * np.pi)
)
return LinearForm(self, components=components)
[docs]
def spherical_cap_average(
self, center: Tuple[float, float], angular_radius: float, /
) -> LinearForm:
r"""Return the exact average functional over a spherical cap."""
return self.spherical_cap_integral(
center,
angular_radius,
normalize=True,
)
[docs]
def geodesic_ball_integral(
self,
center: Tuple[float, float],
radius: float,
/,
*,
n_points: Optional[int] = None,
normalize: bool = False,
) -> LinearForm:
r"""Return the exact integral over a geodesic ball on the sphere.
The geodesic radius is given in the same physical units as
``geodesic_distance``. On a sphere this corresponds to a spherical cap
with angular radius ``radius / self.radius``.
"""
if n_points is not None:
return super().geodesic_ball_integral(
center,
radius,
n_points=n_points,
normalize=normalize,
)
angular_radius = radius / self.radius
return self.spherical_cap_integral(
center,
angular_radius,
normalize=normalize,
)
[docs]
def pairs_within_distance(
self, points: List[Tuple[float, float]], max_distance: float
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
# 1. Convert all (lat, lon) points to 3D Cartesian vectors
vecs = (
np.array([self._to_vector(lat, lon) for lat, lon in points]) * self.radius
)
# 2. Build the K-D Tree
tree = cKDTree(vecs)
# 3. Map geodesic max distance to Euclidean chord distance
# d_chord = 2 * R * sin(d_geodesic / (2R))
max_chord = 2.0 * self.radius * np.sin(max_distance / (2.0 * self.radius))
# 4. Query the tree (returns a sparse DOK matrix)
dok = tree.sparse_distance_matrix(tree, max_chord)
coo = dok.tocoo()
# 5. Convert chord distances back to geodesic distances
# d_geodesic = 2 * R * arcsin(d_chord / (2R))
ratio = np.clip(coo.data / (2.0 * self.radius), -1.0, 1.0)
geo_dists = 2.0 * self.radius * np.arcsin(ratio)
# Note: sparse_distance_matrix explicitly drops zero-distance pairs (the diagonal!)
# We must manually inject the diagonal back in for distance = 0.0
n = len(points)
rows = np.concatenate([coo.row, np.arange(n)])
cols = np.concatenate([coo.col, np.arange(n)])
final_dists = np.concatenate([geo_dists, np.zeros(n)])
return rows, cols, final_dists
[docs]
def with_degree(self, degree: int) -> Lebesgue:
return Lebesgue(degree, radius=self.radius, grid=self.grid, extend=self.extend)
[docs]
def degree_transfer_operator(self, target_degree: int) -> LinearOperator:
"""
Returns the transfer operator from this space to one with a different degree.
This operator leverages the hierarchical nature of the 1D SH vector to
efficiently truncate or zero-pad the coefficients.
"""
codomain = self.with_degree(target_degree)
def mapping(u: sh.SHGrid) -> sh.SHGrid:
vec_in = self.to_components(u)
target_size = (target_degree + 1) ** 2
if target_size > vec_in.size:
vec_out = np.pad(vec_in, (0, target_size - vec_in.size))
else:
vec_out = vec_in[:target_size]
return codomain.from_components(vec_out)
def adjoint_mapping(v: sh.SHGrid) -> sh.SHGrid:
vec_in = codomain.to_components(v)
target_size = (self.lmax + 1) ** 2
if target_size > vec_in.size:
vec_out = np.pad(vec_in, (0, target_size - vec_in.size))
else:
vec_out = vec_in[:target_size]
return self.from_components(vec_out)
return LinearOperator(self, codomain, mapping, adjoint_mapping=adjoint_mapping)
[docs]
def invariant_covariance_function(
self, spectral_variances: np.ndarray
) -> Callable[[np.ndarray], np.ndarray]:
degree_variances = np.zeros(self.lmax + 1)
for l in range(self.lmax + 1):
idx = self.index_to_integer((l, 0))
degree_variances[l] = spectral_variances[idx]
# Spherical harmonic addition theorem coefficients:
# (2l + 1) / (4 * pi * R^2)
coeffs = (
degree_variances
* (2 * np.arange(self.lmax + 1) + 1)
/ (4 * np.pi * self.radius**2)
)
def cov_evaluator(distances: np.ndarray) -> np.ndarray:
cos_theta = np.cos(distances / self.radius)
return np.polynomial.legendre.legval(cos_theta, coeffs)
return cov_evaluator
[docs]
def degree_multiplicity(self, degree: int) -> int:
return 2 * degree + 1
[docs]
def representative_index(self, degree: int) -> Tuple[int, int]:
return (degree, 0)
# ------------------------------------------------------ #
# Methods for HilbertSpace #
# ------------------------------------------------------ #
[docs]
def to_components(self, u: sh.SHGrid) -> np.ndarray:
coeff = self.to_coefficients(u)
return self._coefficient_to_component(coeff)
[docs]
def from_components(self, c: np.ndarray) -> sh.SHGrid:
coeff = self._component_to_coefficients(c)
return self.from_coefficients(coeff)
[docs]
def ax(self, a: float, x: sh.SHGrid) -> None:
"""
Custom in-place ax implementation for pyshtools objects.
x := a*x
"""
x.data *= a
[docs]
def axpy(self, a: float, x: sh.SHGrid, y: sh.SHGrid) -> None:
"""
Custom in-place axpy implementation for pyshtools objects.
y := a*x + y
"""
y.data += a * x.data
def __eq__(self, other: object) -> bool:
"""
Checks for mathematical equality with another Sobolev space on a sphere.
Two spaces are considered equal if they are of the same type and have
the same defining parameters (kmax, order, scale, and radius).
"""
if not isinstance(other, Lebesgue):
return NotImplemented
return (
self.lmax == other.lmax
and self.radius == other.radius
and self.grid == other.grid
and self.extend == other.extend
)
[docs]
def is_element(self, x: Any) -> bool:
"""
Checks if an object is a valid element of the space.
"""
if not isinstance(x, sh.SHGrid):
return False
if not x.lmax == self.lmax:
return False
if not x.grid == self.grid_type:
return False
if not x.extend == self.extend:
return False
return True
# ------------------------------------------------------ #
# Methods for HilbertModule #
# ------------------------------------------------------ #
[docs]
def vector_multiply(self, x1: sh.SHGrid, x2: sh.SHGrid) -> sh.SHGrid:
"""
Computes the pointwise product of two functions.
"""
return x1 * x2
[docs]
def vector_sqrt(self, x: sh.SHGrid) -> sh.SHGrid:
"""
Returns the pointwise square root of a function.
"""
y = x.copy()
y.data = np.sqrt(x.data)
return y
# ------------------------------------------------------ #
# Additional methods #
# ------------------------------------------------------ #
[docs]
def to_coefficient_operator(self, lmax: int, /, *, lmin: int = 0):
r"""
Returns a LinearOperator mapping a function to its spherical harmonic coefficients.
The operator maps an element of the Hilbert space to a vector in \(\mathbb{R}^k\).
The coefficients in the output vector follow the native pyshtools ordering:
ordered by degree $l$ (major), and for each degree, the order $m$ is sorted
as $0, 1, \dots, l, -1, \dots, -l$.
Args:
lmax: The maximum spherical harmonic degree to include in the output.
lmin: The minimum spherical harmonic degree to include. Defaults to 0.
Returns:
A LinearOperator mapping `SHGrid` -> `numpy.ndarray`.
"""
vector_size = (lmax + 1) ** 2 - lmin**2
codomain = EuclideanSpace(vector_size)
def mapping(u: sh.SHGrid) -> np.ndarray:
vec = self.to_components(u)
target_size = (lmax + 1) ** 2
if target_size > vec.size:
vec = np.pad(vec, (0, target_size - vec.size))
else:
vec = vec[:target_size]
# Truncate lower degrees if lmin > 0
return vec[lmin**2 :] if lmin > 0 else vec
def adjoint_mapping(data: np.ndarray) -> sh.SHGrid:
# Pad missing lower degrees if lmin > 0
vec = np.concatenate((np.zeros(lmin**2), data)) if lmin > 0 else data
target_size = (self.lmax + 1) ** 2
if target_size > vec.size:
vec = np.pad(vec, (0, target_size - vec.size))
else:
vec = vec[:target_size]
return self.from_components(vec) / self.radius**2
return LinearOperator(self, codomain, mapping, adjoint_mapping=adjoint_mapping)
[docs]
def from_coefficient_operator(self, lmax: int, /, *, lmin: int = 0):
r"""
Returns a LinearOperator mapping a vector of coefficients to a function.
The operator maps a vector in \(\mathbb{R}^k\) to an element of the Hilbert space.
The input vector must follow the native pyshtools ordering: ordered by
degree $l$ (major), and for each degree, the order $m$ is sorted as
$0, 1, \dots, l, -1, \dots, -l$.
Args:
lmax: The maximum spherical harmonic degree expected in the input.
lmin: The minimum spherical harmonic degree expected. Defaults to 0.
Returns:
A LinearOperator mapping `numpy.ndarray` -> `SHGrid`.
"""
vector_size = (lmax + 1) ** 2 - lmin**2
domain = EuclideanSpace(vector_size)
def mapping(data: np.ndarray) -> sh.SHGrid:
# Pad missing lower degrees if lmin > 0
vec = np.concatenate((np.zeros(lmin**2), data)) if lmin > 0 else data
target_size = (self.lmax + 1) ** 2
if target_size > vec.size:
vec = np.pad(vec, (0, target_size - vec.size))
else:
vec = vec[:target_size]
return self.from_components(vec)
def adjoint_mapping(u: sh.SHGrid) -> np.ndarray:
vec = self.to_components(u)
target_size = (lmax + 1) ** 2
if target_size > vec.size:
vec = np.pad(vec, (0, target_size - vec.size))
else:
vec = vec[:target_size]
# Truncate lower degrees if lmin > 0
vec = vec[lmin**2 :] if lmin > 0 else vec
return vec * self.radius**2
return LinearOperator(domain, self, mapping, adjoint_mapping=adjoint_mapping)
[docs]
def random_domain_points(
self, n: int, /, *, keep_ocean: bool = True, resolution: str = "110m"
) -> List[Tuple[float, float]]:
"""
Generates a list of random points strictly bounded to either the land or the ocean.
This uses Cartopy's Natural Earth geometries and Shapely's spatial indexing
to perform ultra-fast rejection sampling.
Args:
n: The exact number of points to generate.
keep_ocean: If True, returns only ocean points. If False, returns only land points.
resolution: The Cartopy shapefile resolution ('110m', '50m', or '10m').
Returns:
A list of (latitude, longitude) tuples in degrees.
"""
# 1. Fetch and prep the land geometry
shpfilename = shpreader.natural_earth(
resolution=resolution, category="physical", name="land"
)
reader = shpreader.Reader(shpfilename)
combined_land = sgeom.MultiPolygon(list(reader.geometries()))
prepared_land = prep(combined_land)
valid_points = []
# Earth is ~71% ocean and ~29% land. We over-generate our batches
# to ensure we hit our target 'n' in as few loop iterations as possible.
over_generate_factor = 1.5 if keep_ocean else 3.5
while len(valid_points) < n:
needed = n - len(valid_points)
batch_size = max(10, int(needed * over_generate_factor))
# Generate a raw batch using the standard sphere method
batch = self.random_points(batch_size)
for lat, lon in batch:
# Wrap [0, 360] to [-180, 180] strictly for Shapely's geometry bounds
wrapped_lon = (lon + 180) % 360 - 180
point = sgeom.Point(wrapped_lon, lat)
is_on_land = prepared_land.contains(point)
if keep_ocean and not is_on_land:
valid_points.append((lat, lon))
elif not keep_ocean and is_on_land:
valid_points.append((lat, lon))
if len(valid_points) == n:
break
return valid_points
[docs]
def domain_mask(
self, /, *, keep_ocean: bool = False, resolution: str = "110m"
) -> "sh.SHGrid":
"""
Generates a spatial mask that equals 1.0 in the specified domain
and 0.0 outside of it.
Args:
keep_ocean: If True, the mask is 1.0 on the ocean and 0.0 on land.
If False, the mask is 1.0 on land and 0.0 on the ocean.
resolution: The Cartopy shapefile resolution ('110m', '50m', or '10m').
Returns:
An SHGrid object representing the mask.
"""
# 1. Fetch and prep the land geometry
shpfilename = shpreader.natural_earth(
resolution=resolution, category="physical", name="land"
)
reader = shpreader.Reader(shpfilename)
combined_land = sgeom.MultiPolygon(list(reader.geometries()))
# 'prep' makes point-in-polygon queries significantly faster
prepared_land = prep(combined_land)
# 2. Define the indicator function
def mask_func(point: Tuple[float, float]) -> float:
lat, lon = point
# Wrap [0, 360] to [-180, 180] strictly for Shapely's geometry bounds
wrapped_lon = (lon + 180) % 360 - 180
p = sgeom.Point(wrapped_lon, lat)
is_on_land = prepared_land.contains(p)
if keep_ocean:
return 0.0 if is_on_land else 1.0
else:
return 1.0 if is_on_land else 0.0
# 3. Project the indicator function onto the space's SHGrid
return self.project_function(mask_func)
# ------------------------------------------------------ #
# Private methods #
# ------------------------------------------------------ #
def _coefficient_to_component(self, ulm: sh.SHCoeffs) -> np.ndarray:
"""Maps spherical harmonic coefficients to a component vector."""
return sh.shio.SHCilmToVector(ulm.coeffs)
def _component_to_coefficients(self, c: np.ndarray) -> sh.SHCoeffs:
"""Maps a component vector to spherical harmonic coefficients."""
coeffs = sh.shio.SHVectorToCilm(c)
return sh.SHCoeffs.from_array(
coeffs, normalization=self.normalization, csphase=self.csphase
)
@staticmethod
def _to_vector(lat: float, lon: float) -> np.ndarray:
"""Converts a latitude/longitude pair (in degrees) to a 3D unit vector."""
lat_rad, lon_rad = np.radians(lat), np.radians(lon)
return np.array(
[
np.cos(lat_rad) * np.cos(lon_rad),
np.cos(lat_rad) * np.sin(lon_rad),
np.sin(lat_rad),
]
)
@staticmethod
def _to_latlon(vec: np.ndarray) -> Tuple[float, float]:
"""Converts a 3D vector back to a latitude/longitude pair (in degrees)."""
vec = vec / np.linalg.norm(vec)
lat_rad = np.arcsin(vec[2])
lon_rad = np.arctan2(vec[1], vec[0])
return (np.degrees(lat_rad), np.degrees(lon_rad))
@staticmethod
def _tangent_frame(center_vector: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Return an orthonormal basis for the tangent plane at ``center_vector``."""
if abs(center_vector[2]) < 0.9:
reference = np.array([0.0, 0.0, 1.0])
else:
reference = np.array([1.0, 0.0, 0.0])
tangent_1 = np.cross(reference, center_vector)
tangent_1 /= np.linalg.norm(tangent_1)
tangent_2 = np.cross(center_vector, tangent_1)
tangent_2 /= np.linalg.norm(tangent_2)
return tangent_1, tangent_2
@staticmethod
def _distribute_ring_point_counts(
total_points: int, ring_weights: np.ndarray
) -> np.ndarray:
"""Distribute an exact point budget across azimuthal rings."""
if total_points <= 0:
raise ValueError("Total quadrature point count must be positive.")
n_rings = ring_weights.size
counts = np.ones(n_rings, dtype=int)
remaining = total_points - n_rings
if remaining <= 0:
return counts
positive_weights = np.clip(np.asarray(ring_weights, dtype=float), 0.0, None)
if np.all(positive_weights == 0.0):
counts[:remaining] += 1
return counts
scaled = remaining * positive_weights / positive_weights.sum()
increments = np.floor(scaled).astype(int)
counts += increments
leftover = remaining - int(increments.sum())
if leftover > 0:
remainders = scaled - increments
order = np.argsort(-remainders)
counts[order[:leftover]] += 1
return counts
[docs]
class Sobolev(SymmetricSobolevSpace):
"""
Implementation of the Sobolev space Hˢ on a circle.
"""
def __init__(
self,
lmax: int,
order: float,
scale: float,
/,
radius: float = 1,
grid: str = "DH",
extend: bool = True,
safe: bool = True,
):
"""
Args:
lmax: Maximum degree for the expansions.
order: The Sobolev order, controlling the smoothness of functions.
scale: The Sobolev length-scale.
radius: Radius of the sphere. Defaults to 1.
grid: pyshtools grid type. Defaults to "DH"
extend: If true longitudes wrap fully. Defaults to True.
safe: If true, the class checks for mathematical correctness of operations
where possible.
"""
lebesgue_space = Lebesgue(lmax, radius=radius, grid=grid, extend=extend)
SymmetricSobolevSpace.__init__(self, lebesgue_space, order, scale, safe=safe)
[docs]
@staticmethod
def from_sobolev_parameters(
order: float,
scale: float,
/,
*,
radius: float = 1.0,
grid: str = "DH",
rtol: float = 1e-8,
power_of_two: bool = False,
safe: bool = True,
) -> Sobolev:
"""
Creates an instance with `lmax` chosen based on the Sobolev parameters.
This factory method estimates the spherical harmonic truncation degree
(`lmax`) required to represent the space while meeting a specified
relative tolerance for the truncation error. This is useful when the
required `lmax` is not known a priori.
Args:
order: The order of the Sobolev space, controlling smoothness.
scale: The non-dimensional length-scale for the space.
radius: The radius of the sphere. Defaults to 1.0.
grid: The `pyshtools` grid type (e.g., 'DH'). Defaults to 'DH'.
rtol: The relative tolerance used to determine the `lmax`.
power_of_two: If True, `lmax` is set to the next power of two.
safe: If true, the class checks for mathematical correctness of operations
where possible.
Returns:
An instance of the Sobolev class with a calculated `lmax`.
"""
if order <= 1.0:
raise ValueError("This method is only applicable for orders > 1.0")
summation = 1.0
l = 0
err = 1.0
def sobolev_func(deg):
return (1.0 + (scale / radius) ** 2 * deg * (deg + 1)) ** order
while err > rtol:
l += 1
term = 1 / sobolev_func(l)
summation += term
err = term / summation
if l > 10000:
raise RuntimeError("Failed to converge on a stable lmax.")
if power_of_two:
n = int(np.log2(l))
l = 2 ** (n + 1)
lmax = l
return Sobolev(lmax, order, scale, radius=radius, grid=grid, safe=safe)
[docs]
@classmethod
def from_covariance(
cls,
covariance_function: Callable[[float], float],
order: float,
scale: float,
/,
*,
radius: float = 1.0,
grid: str = "DH",
extend: bool = True,
rtol: float = 1e-6,
min_degree: int = 0,
max_degree: Optional[int] = None,
power_of_two: bool = False,
safe: bool = True,
) -> Sobolev:
"""
Creates an instance of the Sobolev space with a spherical harmonic truncation degree
(`lmax`) automatically chosen to capture the expected energy of functions drawn from
a specified prior measure.
This factory method calculates the expected squared norm (energy) of a random field
whose spectral variances are defined by the provided `covariance_function`, accounting
for the Sobolev mass-weighting factor. It iteratively adds higher spherical harmonic
degrees until the relative contribution of the next degree drops below the tolerance.
Args:
covariance_function: A callable mapping a Laplacian eigenvalue to its spectral variance.
order: The Sobolev order, controlling the smoothness of functions.
scale: The Sobolev length-scale.
radius: The radius of the sphere. Defaults to 1.0.
grid: The pyshtools spatial grid format to use. Defaults to "DH".
extend: If True, the spatial grid includes both 0 and 360-degree longitudes.
rtol: The relative tolerance for the energy truncation. Defaults to 1e-6.
min_degree: The absolute minimum truncation degree to return. Defaults to 0.
max_degree: An optional safety ceiling for the truncation degree. If convergence
is not reached by this degree, the search terminates and returns this value.
power_of_two: If True, rounds the resulting `lmax` up to the nearest power of two.
safe: If True, enables mathematical correctness checks during operations.
Returns:
Sobolev: A fully instantiated Sobolev space on the sphere with the optimal `lmax`.
Raises:
RuntimeError: If the energy sequence fails to converge within 100,000 iterations
and no `max_degree` is specified.
"""
dummy_space = cls(
max(1, min_degree),
order,
scale,
radius=radius,
grid=grid,
extend=extend,
safe=safe,
)
optimal_degree = dummy_space.estimate_truncation_degree(
covariance_function, rtol=rtol, min_degree=min_degree, max_degree=max_degree
)
if power_of_two:
n = int(np.log2(optimal_degree))
optimal_degree = 2 ** (n + 1)
return cls(
optimal_degree,
order,
scale,
radius=radius,
grid=grid,
extend=extend,
safe=safe,
)
[docs]
@classmethod
def from_heat_kernel_prior(
cls,
kernel_scale: float,
order: float,
scale: float,
/,
*,
radius: float = 1.0,
grid: str = "DH",
extend: bool = True,
rtol: float = 1e-6,
min_degree: int = 0,
max_degree: Optional[int] = None,
power_of_two: bool = False,
safe: bool = True,
) -> Sobolev:
"""
Creates an instance of the Sobolev space on the sphere, tuned to the expected
energy of a Heat Kernel prior measure.
Args:
kernel_scale: The length-scale parameter of the heat kernel covariance.
order: The Sobolev order defining the function space.
scale: The Sobolev length-scale defining the function space.
radius: The radius of the sphere. Defaults to 1.0.
grid: The pyshtools spatial grid format to use. Defaults to "DH".
extend: If True, the spatial grid includes both 0 and 360-degree longitudes.
rtol: The relative tolerance for the energy truncation. Defaults to 1e-6.
min_degree: The absolute minimum truncation degree to return. Defaults to 0.
max_degree: An optional safety ceiling for the truncation degree.
power_of_two: If True, rounds the resulting `lmax` up to the nearest power of two.
safe: If True, enables mathematical correctness checks during operations.
"""
return cls.from_covariance(
cls.heat_kernel(kernel_scale),
order,
scale,
radius=radius,
grid=grid,
extend=extend,
rtol=rtol,
min_degree=min_degree,
max_degree=max_degree,
power_of_two=power_of_two,
safe=safe,
)
[docs]
@classmethod
def from_sobolev_kernel_prior(
cls,
kernel_order: float,
kernel_scale: float,
order: float,
scale: float,
/,
*,
radius: float = 1.0,
grid: str = "DH",
extend: bool = True,
rtol: float = 1e-6,
min_degree: int = 0,
max_degree: Optional[int] = None,
power_of_two: bool = False,
safe: bool = True,
) -> Sobolev:
"""
Creates an instance of the Sobolev space on the sphere, tuned to the expected
energy of a Sobolev-type prior measure.
Args:
kernel_order: The smoothness order of the Sobolev prior measure.
kernel_scale: The length-scale parameter of the Sobolev prior measure.
order: The Sobolev order defining the function space.
scale: The Sobolev length-scale defining the function space.
radius: The radius of the sphere. Defaults to 1.0.
grid: The pyshtools spatial grid format to use. Defaults to "DH".
extend: If True, the spatial grid includes both 0 and 360-degree longitudes.
rtol: The relative tolerance for the energy truncation. Defaults to 1e-6.
min_degree: The absolute minimum truncation degree to return. Defaults to 0.
max_degree: An optional safety ceiling for the truncation degree.
power_of_two: If True, rounds the resulting `lmax` up to the nearest power of two.
safe: If True, enables mathematical correctness checks during operations.
"""
return cls.from_covariance(
cls.sobolev_kernel(kernel_order, kernel_scale),
order,
scale,
radius=radius,
grid=grid,
extend=extend,
rtol=rtol,
min_degree=min_degree,
max_degree=max_degree,
power_of_two=power_of_two,
safe=safe,
)
# ----------------------------------------- #
# Properties #
# ----------------------------------------- #
@property
def lmax(self) -> int:
"""The maximum spherical harmonic truncation degree."""
return self.underlying_space.lmax
@property
def radius(self) -> float:
"""The radius of the sphere."""
return self.underlying_space.radius
@property
def grid(self) -> str:
"""The `pyshtools` grid type used for spatial representations."""
return self.underlying_space.grid
@property
def sampling(self) -> int:
"""The sampling factor used for spatial representations."""
return self.underlying_space.sampling
@property
def extend(self) -> bool:
"""True if the spatial grid includes both 0 and 360-degree longitudes."""
return self.underlying_space.extend
@property
def normalization(self) -> str:
"""The spherical harmonic normalization convention used ('ortho')."""
return self.underlying_space.normalization
@property
def csphase(self) -> int:
"""The Condon-Shortley phase convention used (1)."""
return self.underlying_space.csphase
@property
def grid_type(self) -> str:
"""
Returns the pyshtools grid type.
"""
return self.underlying_space.grid_type
# -------------------------------------------------- #
# Public methods #
# -------------------------------------------------- #
[docs]
def with_order(self, order: float) -> Sobolev:
return Sobolev(
self.lmax,
order,
self.scale,
radius=self.radius,
grid=self.grid,
extend=self.extend,
)
[docs]
def with_degree(self, degree: int) -> Sobolev:
return Sobolev(
degree,
self.order,
self.scale,
radius=self.radius,
grid=self.grid,
extend=self.extend,
)
[docs]
def project_function(self, f: Callable[[(float, float)], float]) -> np.ndarray:
"""
Returns an element of the space by projecting a given function.
Args:
f: A function that takes a point `(lat, lon)` and returns a value.
"""
return self.underlying_space.project_function(f)
[docs]
def to_coefficients(self, u: sh.SHGrid) -> sh.SHCoeffs:
"""Maps a function vector to its spherical harmonic coefficients."""
return self.underlying_space.to_coefficients(u)
[docs]
def from_coefficients(self, ulm: sh.SHCoeffs) -> sh.SHGrid:
"""Maps spherical harmonic coefficients to a function vector."""
return self.underlying_space.from_coefficients(ulm)
[docs]
def sample_power_measure(
self,
measure,
n_samples,
/,
*,
lmin=None,
lmax=None,
parallel: bool = False,
n_jobs: int = -1,
):
"""
Takes in a Gaussian measure on the space, draws n_samples from
and returns samples for the spherical harmonic power at degrees in
the indicated range.
"""
return self.underlying_space.sample_power_measure(
measure, n_samples, lmin=lmin, lmax=lmax, parallel=parallel, n_jobs=n_jobs
)
[docs]
def ax(self, a: float, x: sh.SHGrid) -> None:
self.underlying_space.ax(a, x)
[docs]
def axpy(self, a: float, x: sh.SHGrid, y: sh.SHGrid) -> None:
self.underlying_space.axpy(a, x, y)
[docs]
def to_coefficient_operator(self, lmax: int, /, *, lmin: int = 0):
r"""
Returns a LinearOperator mapping a function to its spherical harmonic coefficients.
The operator maps an element of the Hilbert space to a vector in $\mathbb{R}^k$.
The coefficients in the output vector are ordered by degree $l$ (major)
and order $m$ (minor), from $-l$ to $+l$.
**Ordering:**
.. math::
u = [u_{0,0}, \quad u_{1,-1}, u_{1,0}, u_{1,1}, \quad u_{2,-2}, \dots, u_{2,2}, \quad \dots]
(assuming `lmin=0`).
Args:
lmax: The maximum spherical harmonic degree to include in the output.
lmin: The minimum spherical harmonic degree to include. Defaults to 0.
Returns:
A LinearOperator mapping `SHGrid` -> `numpy.ndarray`.
"""
l2_operator = self.underlying_space.to_coefficient_operator(lmax, lmin=lmin)
return LinearOperator.from_formal_adjoint(
self, l2_operator.codomain, l2_operator
)
[docs]
def from_coefficient_operator(self, lmax: int, /, *, lmin: int = 0):
r"""
Returns a LinearOperator mapping a vector of coefficients to a function.
The operator maps a vector in $\mathbb{R}^k$ to an element of the Hilbert space.
The input vector must follow the standard $l$-major, $m$-minor ordering.
**Ordering:**
.. math::
v = [u_{0,0}, \quad u_{1,-1}, u_{1,0}, u_{1,1}, \quad u_{2,-2}, \dots, u_{2,2}, \quad \dots]
(assuming `lmin=0`).
Args:
lmax: The maximum spherical harmonic degree expected in the input.
lmin: The minimum spherical harmonic degree expected. Defaults to 0.
Returns:
A LinearOperator mapping `numpy.ndarray` -> `SHGrid`.
"""
l2_operator = self.underlying_space.from_coefficient_operator(lmax, lmin=lmin)
return LinearOperator.from_formal_adjoint(l2_operator.domain, self, l2_operator)
[docs]
def point_evaluation_operator(
self,
points: List[Tuple[float, float]],
/,
*,
matrix_free: bool = False,
parallel: bool = False,
n_jobs: int = -1,
) -> LinearOperator:
"""
Returns a linear operator that evaluates a function at a list of points.
Args:
points: A list of (latitude, longitude) tuples.
matrix_free: If True, evaluates the operator on-the-fly to save memory.
parallel: If True, distributes the matrix-free evaluations across CPU cores.
n_jobs: Number of parallel jobs. -1 uses all available cores.
"""
if not matrix_free:
return super().point_evaluation_operator(points)
lats = [p[0] for p in points]
lons = [p[1] for p in points]
codomain = EuclideanSpace(len(points))
# Heuristic chunking: Target ~4 chunks per worker to balance load and overhead
n_points = len(points)
num_chunks = min(n_points, abs(n_jobs) * 4 if n_jobs > 0 else 16)
chunks = np.array_split(range(n_points), num_chunks)
def mapping(u: sh.SHGrid) -> np.ndarray:
coeffs = self.to_coefficients(u)
if not parallel:
raw_vals = coeffs.expand(lat=lats, lon=lons, degrees=True)
return np.array(raw_vals, dtype=float).flatten()
def compute_fwd_chunk(indices):
chunk_lats = [lats[i] for i in indices]
chunk_lons = [lons[i] for i in indices]
raw = coeffs.expand(lat=chunk_lats, lon=chunk_lons, degrees=True)
return np.array(raw, dtype=float).flatten()
results = Parallel(n_jobs=n_jobs)(
delayed(compute_fwd_chunk)(chunk) for chunk in chunks if len(chunk) > 0
)
return np.concatenate(results)
def adjoint_mapping(y: np.ndarray) -> sh.SHGrid:
from pygeoinf.linear_forms import LinearForm
if not parallel:
total_components = np.zeros(self.dim)
for i, pt in enumerate(points):
if y[i] != 0.0:
total_components += y[i] * self.dirac(pt).components
return self.from_dual(LinearForm(self, components=total_components))
def compute_adj_chunk(indices):
local_comps = np.zeros(self.dim)
for i in indices:
if y[i] != 0.0:
local_comps += y[i] * self.dirac(points[i]).components
return local_comps
results = Parallel(n_jobs=n_jobs)(
delayed(compute_adj_chunk)(chunk) for chunk in chunks if len(chunk) > 0
)
total_components = sum(results)
return self.from_dual(LinearForm(self, components=total_components))
return LinearOperator(self, codomain, mapping, adjoint_mapping=adjoint_mapping)
[docs]
def path_average_operator(
self,
paths: List[Tuple[Tuple[float, float], Tuple[float, float]]],
/,
*,
n_points: Optional[int] = None,
matrix_free: bool = False,
parallel: bool = False,
n_jobs: int = -1,
lazy_quadrature: bool = False,
) -> LinearOperator:
"""
Constructs a tomographic operator mapping a function field to its
line integrals along a set of geodesic paths.
Args:
paths: A list of start and end point pairs defining the geodesics.
n_points: The number of quadrature points per path.
matrix_free: If True, evaluates the integrals on-the-fly to save memory.
parallel: If True, distributes the matrix-free evaluations across CPU cores.
n_jobs: Number of parallel jobs. -1 uses all available cores.
lazy_quadrature: If True, computes quadrature points on-the-fly per chunk
to prevent memory blowouts when evaluating massive numbers of paths.
"""
if not matrix_free:
return super().path_average_operator(paths, n_points=n_points)
n_paths = len(paths)
codomain = EuclideanSpace(n_paths)
# Determine chunking strategy (aim for reasonable sized blocks)
num_chunks = min(n_paths, abs(n_jobs) * 4 if n_jobs > 0 else 16)
chunks = np.array_split(range(n_paths), num_chunks)
# Helper to dynamically get quadrature points for a single path
def get_quad(p1, p2):
if n_points is None:
_, temp_weights = self.geodesic_quadrature(p1, p2, n_points=2)
arc_length = np.sum(temp_weights)
pts_count = int(np.ceil((arc_length / self.scale) * 2.0))
pts_count = max(2, pts_count)
else:
pts_count = n_points
return self.geodesic_quadrature(p1, p2, pts_count)
if not lazy_quadrature:
# =========================================================
# MODE 1: PRECOMPUTE ALL (Fastest, High Memory)
# =========================================================
def compute_quad_chunk(indices):
chunk_pts, chunk_wts = [], []
for i in indices:
pts, wts = get_quad(*paths[i])
chunk_pts.append(pts)
chunk_wts.append(wts)
return chunk_pts, chunk_wts
if parallel:
quad_results = Parallel(n_jobs=n_jobs)(
delayed(compute_quad_chunk)(chunk)
for chunk in chunks
if len(chunk) > 0
)
all_quad_points, all_quad_weights = [], []
for c_pts, c_wts in quad_results:
all_quad_points.extend(c_pts)
all_quad_weights.extend(c_wts)
else:
all_quad_points, all_quad_weights = compute_quad_chunk(range(n_paths))
flat_lats, flat_lons, path_slices = [], [], []
idx = 0
for pts in all_quad_points:
pts_count = len(pts)
for pt in pts:
flat_lats.append(pt[0])
flat_lons.append(pt[1])
path_slices.append((idx, idx + pts_count))
idx += pts_count
def mapping(u: sh.SHGrid) -> np.ndarray:
coeffs = self.to_coefficients(u)
if not parallel:
raw = coeffs.expand(lat=flat_lats, lon=flat_lons, degrees=True)
flat = np.array(raw, dtype=float).flatten()
res = np.zeros(n_paths)
for i, (start, end) in enumerate(path_slices):
res[i] = np.sum(flat[start:end] * all_quad_weights[i])
return res
def compute_fwd_chunk(indices):
local_lats, local_lons, local_wts, local_slices = [], [], [], []
local_idx = 0
for i in indices:
pts, wts = all_quad_points[i], all_quad_weights[i]
for pt in pts:
local_lats.append(pt[0])
local_lons.append(pt[1])
c_len = len(pts)
local_slices.append((local_idx, local_idx + c_len))
local_wts.append(wts)
local_idx += c_len
raw = coeffs.expand(lat=local_lats, lon=local_lons, degrees=True)
flat = np.array(raw, dtype=float).flatten()
res = np.zeros(len(indices))
for k, (start, end) in enumerate(local_slices):
res[k] = np.sum(flat[start:end] * local_wts[k])
return res
results = Parallel(n_jobs=n_jobs)(
delayed(compute_fwd_chunk)(chunk)
for chunk in chunks
if len(chunk) > 0
)
return np.concatenate(results)
def adjoint_mapping(y: np.ndarray) -> sh.SHGrid:
from pygeoinf.linear_forms import LinearForm
if not parallel:
total_components = np.zeros(self.dim)
for i in range(n_paths):
if y[i] != 0.0:
path_comps = np.zeros(self.dim)
for pt, w in zip(all_quad_points[i], all_quad_weights[i]):
path_comps += w * self.dirac(pt).components
total_components += y[i] * path_comps
return self.from_dual(LinearForm(self, components=total_components))
def compute_adj_chunk(indices):
local_comps = np.zeros(self.dim)
for i in indices:
if y[i] != 0.0:
path_comps = np.zeros(self.dim)
for pt, w in zip(all_quad_points[i], all_quad_weights[i]):
path_comps += w * self.dirac(pt).components
local_comps += y[i] * path_comps
return local_comps
results = Parallel(n_jobs=n_jobs)(
delayed(compute_adj_chunk)(chunk)
for chunk in chunks
if len(chunk) > 0
)
return self.from_dual(LinearForm(self, components=sum(results)))
else:
# =========================================================
# MODE 2: LAZY EVALUATION (Slightly slower, Low Memory)
# =========================================================
def mapping(u: sh.SHGrid) -> np.ndarray:
coeffs = self.to_coefficients(u)
def compute_lazy_fwd_chunk(indices):
local_lats, local_lons, local_wts, local_slices = [], [], [], []
local_idx = 0
# 1. Compute geometry dynamically for this chunk
for i in indices:
pts, wts = get_quad(*paths[i])
for pt in pts:
local_lats.append(pt[0])
local_lons.append(pt[1])
c_len = len(pts)
local_slices.append((local_idx, local_idx + c_len))
local_wts.append(wts)
local_idx += c_len
# 2. Vectorized pyshtools evaluation just for this chunk's points
raw = coeffs.expand(lat=local_lats, lon=local_lons, degrees=True)
flat = np.array(raw, dtype=float).flatten()
# 3. Sum up the line integrals
res = np.zeros(len(indices))
for k, (start, end) in enumerate(local_slices):
res[k] = np.sum(flat[start:end] * local_wts[k])
return res
if parallel:
results = Parallel(n_jobs=n_jobs)(
delayed(compute_lazy_fwd_chunk)(chunk)
for chunk in chunks
if len(chunk) > 0
)
return np.concatenate(results)
else:
return np.concatenate(
[compute_lazy_fwd_chunk(c) for c in chunks if len(c) > 0]
)
def adjoint_mapping(y: np.ndarray) -> sh.SHGrid:
from pygeoinf.linear_forms import LinearForm
def compute_lazy_adj_chunk(indices):
local_comps = np.zeros(self.dim)
for i in indices:
if y[i] != 0.0:
pts, wts = get_quad(*paths[i])
path_comps = np.zeros(self.dim)
for pt, w in zip(pts, wts):
path_comps += w * self.dirac(pt).components
local_comps += y[i] * path_comps
return local_comps
if parallel:
results = Parallel(n_jobs=n_jobs)(
delayed(compute_lazy_adj_chunk)(chunk)
for chunk in chunks
if len(chunk) > 0
)
total_components = sum(results)
else:
total_components = sum(
[compute_lazy_adj_chunk(c) for c in chunks if len(c) > 0]
)
return self.from_dual(LinearForm(self, components=total_components))
return LinearOperator(self, codomain, mapping, adjoint_mapping=adjoint_mapping)
[docs]
@staticmethod
def iris_stations(n_stations: int = None, include_names: bool = False):
"""
Convenience method to retrieve a globally distributed set of
Global Seismograph Network (GSN) stations from IRIS.
Args:
n_stations: The number of random stations to return. If None,
returns the entire available network (~177 stations).
include_names: If True, returns (Name, Latitude, Longitude).
If False, returns (Latitude, Longitude) tuples.
Returns:
A list of tuples representing the station coordinates in degrees.
"""
return load_gsn_stations(n_stations=n_stations, include_names=include_names)
[docs]
@staticmethod
def random_earthquakes(n_points: int, min_magnitude: float = 5.0):
"""
Returns a random sample of real, sensible earthquake locations.
This provides a highly clustered, non-uniform spatial distribution
along tectonic boundaries, perfect for testing robust spatial operators.
Args:
n_points: The exact number of locations to return.
min_magnitude: The minimum magnitude threshold for the catalog.
Returns:
A list of (Latitude, Longitude) tuples in degrees.
"""
from pygeoinf.datasets import sample_earthquakes
# The FDSN API returns (lat, lon, depth). We strip the depth
# since we are operating on the 2D surface of the sphere.
events = sample_earthquakes(n_points, min_magnitude=min_magnitude)
return [(lat, lon) for lat, lon, depth in events]
[docs]
def random_domain_points(
self, n: int, /, *, keep_ocean: bool = True, resolution: str = "110m"
) -> List[Tuple[float, float]]:
"""
Generates a list of random points strictly bounded to either the land or the ocean.
"""
return self.underlying_space.random_domain_points(
n, keep_ocean=keep_ocean, resolution=resolution
)
[docs]
def domain_mask(
self, /, *, keep_ocean: bool = False, resolution: str = "110m"
) -> "sh.SHGrid":
"""
Generates a spatial mask that equals 1.0 in the specified domain
and 0.0 outside of it.
Args:
keep_ocean: If True, the mask is 1.0 on the ocean and 0.0 on land.
If False, the mask is 1.0 on land and 0.0 on the ocean.
resolution: The Cartopy shapefile resolution ('110m', '50m', or '10m').
Returns:
An SHGrid object representing the mask.
"""
return self.underlying_space.domain_mask(
keep_ocean=keep_ocean, resolution=resolution
)
# -------------------------------------------------- #
# Associated plotting methods #
# -------------------------------------------------- #
def _unwrap_axes(
ax_input: Optional[Union[GeoAxes, Tuple[GeoAxes, Any]]],
projection: Optional[Projection] = None,
) -> Tuple[plt.Figure, GeoAxes]:
"""Helper to safely unwrap chained axes tuples and initialize figures."""
if ax_input is None:
fig, ax = create_map_figure(projection=projection)
return fig, ax
ax = ax_input[0] if isinstance(ax_input, tuple) else ax_input
return ax.get_figure(), ax
[docs]
def plot(
u: "SHGrid",
/,
*,
ax: Optional[Union[GeoAxes, Tuple[GeoAxes, Any]]] = None,
projection: Optional[Projection] = None,
contour: bool = False,
cmap: str = "RdBu",
coasts: bool = False,
rivers: bool = False,
borders: bool = False,
map_extent: Optional[List[float]] = None,
gridlines: bool = True,
gridlines_kwargs: Optional[dict] = None,
symmetric: Union[bool, float] = False,
contour_lines: bool = False,
contour_lines_kwargs: Optional[dict] = None,
num_levels: int = 10,
colorbar: bool = False,
colorbar_kwargs: Optional[dict] = None,
**kwargs,
) -> Tuple[GeoAxes, Any]:
"""Creates a high-quality map plot of a spherical harmonic function using Cartopy."""
fig, ax = _unwrap_axes(ax, projection)
lons, lats = u.lons(), u.lats()
if map_extent is not None:
ax.set_extent(map_extent, crs=ccrs.PlateCarree())
if coasts:
ax.add_feature(cfeature.COASTLINE, linewidth=0.8)
if rivers:
ax.add_feature(cfeature.RIVERS, linewidth=0.8)
if borders:
ax.add_feature(cfeature.BORDERS, linewidth=0.8)
kwargs.setdefault("cmap", cmap)
if symmetric:
scale_factor = 1.2 if isinstance(symmetric, bool) else float(symmetric)
data_max = scale_factor * np.nanmax(np.abs(u.data))
kwargs.setdefault("vmin", -data_max)
kwargs.setdefault("vmax", data_max)
if "levels" in kwargs:
levels = kwargs.pop("levels")
else:
vmin = kwargs.get("vmin", np.nanmin(u.data))
vmax = kwargs.get("vmax", np.nanmax(u.data))
levels = np.linspace(vmin, vmax, num_levels)
im: Any
if contour:
kwargs.pop("vmin", None)
kwargs.pop("vmax", None)
im = ax.contourf(
lons, lats, u.data, transform=ccrs.PlateCarree(), levels=levels, **kwargs
)
else:
im = ax.pcolormesh(lons, lats, u.data, transform=ccrs.PlateCarree(), **kwargs)
if contour_lines:
cl_kwargs = contour_lines_kwargs.copy() if contour_lines_kwargs else {}
cl_kwargs.setdefault("colors", "k")
cl_kwargs.setdefault("linewidths", 0.5)
ax.contour(
lons, lats, u.data, transform=ccrs.PlateCarree(), levels=levels, **cl_kwargs
)
if gridlines:
gl_opts = gridlines_kwargs.copy() if gridlines_kwargs else {}
lat_interval = gl_opts.pop("lat_interval", 30)
lon_interval = gl_opts.pop("lon_interval", 30)
gl_opts.setdefault("linestyle", "--")
gl_opts.setdefault("draw_labels", True)
gl_opts.setdefault("dms", True)
gl_opts.setdefault("x_inline", False)
gl_opts.setdefault("y_inline", False)
gl = ax.gridlines(**gl_opts)
gl.xlocator = mticker.MultipleLocator(lon_interval)
gl.ylocator = mticker.MultipleLocator(lat_interval)
gl.xformatter = LongitudeFormatter()
gl.yformatter = LatitudeFormatter()
ax.gridliner = gl
if colorbar and fig:
cb_opts = colorbar_kwargs.copy() if colorbar_kwargs else {}
cb_opts.setdefault("orientation", "horizontal")
cb_opts.setdefault("shrink", 0.7)
cb_opts.setdefault("pad", 0.05)
cb = fig.colorbar(im, ax=ax, **cb_opts)
im.colorbar = cb
return ax, im
[docs]
def plot_points(
points: List[Tuple[float, float]],
/,
*,
data: Optional[Union[List[float], np.ndarray]] = None,
ax: Optional[Union[GeoAxes, Tuple[GeoAxes, Any]]] = None,
projection: Optional[Projection] = None,
cmap: str = "RdBu",
color: str = "red",
s: float = 20,
marker: str = "o",
edgecolors: str = "none",
zorder: int = 5,
coasts: bool = False,
rivers: bool = False,
borders: bool = False,
map_extent: Optional[List[float]] = None,
gridlines: bool = True,
gridlines_kwargs: Optional[dict] = None,
symmetric: Union[bool, float] = False,
colorbar: bool = False,
colorbar_kwargs: Optional[dict] = None,
**kwargs,
) -> Tuple[GeoAxes, Any]:
"""Plots discrete observation points on a map."""
fig, ax = _unwrap_axes(ax, projection)
# Only set global if we initialized a fresh axis and no extent is provided
if map_extent is None and not ax.get_extent():
ax.set_global()
elif map_extent is not None:
ax.set_extent(map_extent, crs=ccrs.PlateCarree())
if coasts:
ax.add_feature(cfeature.COASTLINE, linewidth=0.8, zorder=10)
if rivers:
ax.add_feature(cfeature.RIVERS, linewidth=0.8, zorder=10)
if borders:
ax.add_feature(cfeature.BORDERS, linewidth=0.8, zorder=10)
lats = [p[0] for p in points]
lons = [p[1] for p in points]
if data is not None:
if len(data) != len(points):
raise ValueError("The length of 'data' must match the length of 'points'.")
c = data
kwargs.setdefault("cmap", cmap)
if symmetric:
scale_factor = 1.2 if isinstance(symmetric, bool) else float(symmetric)
data_max = scale_factor * np.nanmax(np.abs(data))
kwargs.setdefault("vmin", -data_max)
kwargs.setdefault("vmax", data_max)
else:
c = color
sc = ax.scatter(
lons,
lats,
c=c,
s=s,
marker=marker,
edgecolors=edgecolors,
zorder=zorder,
transform=ccrs.PlateCarree(),
**kwargs,
)
if gridlines:
gl_opts = gridlines_kwargs.copy() if gridlines_kwargs else {}
lat_interval = gl_opts.pop("lat_interval", 30)
lon_interval = gl_opts.pop("lon_interval", 30)
gl_opts.setdefault("linestyle", "--")
gl_opts.setdefault("draw_labels", True)
gl_opts.setdefault("zorder", 10)
gl = ax.gridlines(**gl_opts)
gl.xlocator = mticker.MultipleLocator(lon_interval)
gl.ylocator = mticker.MultipleLocator(lat_interval)
gl.xformatter = LongitudeFormatter()
gl.yformatter = LatitudeFormatter()
ax.gridliner = gl
if colorbar and data is not None and fig:
cb_opts = colorbar_kwargs.copy() if colorbar_kwargs else {}
cb_opts.setdefault("orientation", "horizontal")
cb_opts.setdefault("shrink", 0.7)
cb_opts.setdefault("pad", 0.05)
cb = fig.colorbar(sc, ax=ax, **cb_opts)
sc.colorbar = cb
return ax, sc
[docs]
def plot_geodesic(
p1: Tuple[float, float],
p2: Tuple[float, float],
/,
*,
ax: Optional[Union[GeoAxes, Tuple[GeoAxes, Any]]] = None,
**kwargs,
) -> Tuple[GeoAxes, Any]:
"""Plots a geodesic (great-circle) curve between two points on the sphere."""
_, ax = _unwrap_axes(ax)
kwargs.setdefault("color", "black")
kwargs.setdefault("linewidth", 2)
lat1, lon1 = p1
lat2, lon2 = p2
# ax.plot returns a list of Line2D objects; we extract the first one
(line,) = ax.plot([lon1, lon2], [lat1, lat2], transform=ccrs.Geodetic(), **kwargs)
return ax, line
[docs]
def plot_geodesic_network(
paths: List[Tuple[Tuple[float, float], Tuple[float, float]]],
/,
*,
ax: Optional[Union[GeoAxes, Tuple[GeoAxes, Any]]] = None,
plot_sources: bool = True,
plot_receivers: bool = True,
source_kwargs: Optional[dict] = None,
receiver_kwargs: Optional[dict] = None,
**kwargs,
) -> Tuple[GeoAxes, List[Any]]:
"""Plots a network of intersecting geodesic paths onto a Cartopy map."""
fig, ax = _unwrap_axes(ax)
if not ax.get_extent():
ax.set_global()
ax.add_feature(cfeature.COASTLINE, linewidth=0.8)
kwargs.setdefault("color", "black")
kwargs.setdefault("linewidth", 0.8)
kwargs.setdefault("alpha", 0.5)
artists = []
for p1, p2 in paths:
_, line = plot_geodesic(p1, p2, ax=ax, **kwargs)
artists.append(line)
if plot_sources or plot_receivers:
sources = list(set([tuple(p[0]) for p in paths]))
receivers = list(set([tuple(p[1]) for p in paths]))
if plot_sources and sources:
src_lats, src_lons = zip(*sources)
src_style = source_kwargs.copy() if source_kwargs else {}
src_style.setdefault("marker", "*")
src_style.setdefault("color", "gold")
src_style.setdefault("s", 150)
src_style.setdefault("edgecolor", "black")
src_style.setdefault("zorder", 5)
sc_src = ax.scatter(
src_lons, src_lats, transform=ccrs.Geodetic(), **src_style
)
artists.append(sc_src)
if plot_receivers and receivers:
rec_lats, rec_lons = zip(*receivers)
rec_style = receiver_kwargs.copy() if receiver_kwargs else {}
rec_style.setdefault("marker", "o")
rec_style.setdefault("color", "red")
rec_style.setdefault("s", 50)
rec_style.setdefault("edgecolor", "white")
rec_style.setdefault("zorder", 5)
sc_rec = ax.scatter(
rec_lons, rec_lats, transform=ccrs.Geodetic(), **rec_style
)
artists.append(sc_rec)
return ax, artists