"""Numerical evaluation pipeline for DiagramTerm objects.
Provides the 4-step workflow:
1. Coupling coefficients — handled by DiagramTerm.evaluate_coupling() (existing)
2. Spatial structure analysis — analyze_spatial()
3. Propagator evaluation — PropagatorModel + PropagatorCache
4. Contraction & integration — DiagramIntegrand
"""
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Callable
import numpy as np
from .expressions import Propagator
# ---------------------------------------------------------------------------
# Step 2: Spatial Structure Analysis
# ---------------------------------------------------------------------------
[docs]
@dataclass(frozen=True)
class SpatialStructure:
"""Result of analyzing R-propagator connectivity and time orderings.
R propagators of the form ``δ(n−n') Θ(t−t') R_time(t,t')`` identify
directions along R-chains and impose causal time orderings.
"""
#: Groups of spatial points connected by R propagators (share direction).
direction_groups: tuple[frozenset[str], ...]
#: spatial_point → representative direction variable name (e.g. ``'n_x'``).
direction_map: dict[str, str]
#: Time ordering pairs: ``(earlier_point, later_point)`` from R causality.
#: ``R(a, b)`` with ``Θ(t_a − t_b)`` means ``t_b ≤ t_a``.
time_orderings: tuple[tuple[str, str], ...]
#: R propagators as ``(spatial_left, spatial_right)`` pairs.
r_propagators: tuple[tuple[str, str], ...]
#: C propagators: ``(spatial_left, spatial_right, index_left, index_right)``.
c_propagators: tuple[tuple[str, str, str | None, str | None], ...]
#: Time integration variables, topologically sorted (innermost first).
#: For an ``equal_time`` non-local vertex, only the canonical
#: representative leg appears here; the other (m-1) legs of that
#: vertex are removed and their time values are read through the
#: ``equal_time_aliases`` map below.
time_integration_vars: tuple[str, ...]
#: Surviving direction integration variables (one per R-component that
#: contains only integration points — typically empty).
direction_integration_vars: tuple[str, ...]
#: External (non-integration) spatial points.
external_points: tuple[str, ...]
#: Maps non-representative internal spatial labels to their canonical
#: time representative, propagated from
#: ``DiagramTerm.equal_time_aliases``. The integration loops look up
#: every leg's time via ``equal_time_aliases.get(label, label)`` so
#: aliased legs share a single integration variable while keeping
#: independent spatial labels. Empty tuple ⇒ no aliasing (original
#: cross-spacetime cumulant behaviour).
equal_time_aliases: tuple[tuple[str, str], ...] = ()
#: ``(partner_label, leg_label)`` pairs identifying R-propagators
#: whose factor has been absorbed into an upstream
#: ``NonLocalVertex(already_R_contracted=True)`` callable.
#: Propagated straight from ``DiagramTerm.r_absorbed_pairs``. The
#: integrand R-product loop iterates over this set to **skip** the
#: matching propagators (so the factor becomes 1 rather than
#: ``R(t, t) = 0`` under Itô after the leg time aliases onto the
#: partner's). The leg's time / direction collapse onto the
#: partner's via the accompanying ``equal_time_aliases`` entries.
r_absorbed_pairs: tuple[tuple[str, str], ...] = ()
def _kept_r_propagators(
spatial: "SpatialStructure",
) -> tuple[tuple[str, str], ...]:
"""Return ``spatial.r_propagators`` minus any pair listed in
``spatial.r_absorbed_pairs``.
Absorbed R-propagators contribute to direction grouping and time
ordering (the leg label is union-find'd with its partner via the
original R-propagator) but their R-factor has been folded into an
upstream ``κ^(m)_R`` callable. The integrand R-product loops must
skip them; calling this helper centralises the filter so every
factor-multiplication site applies the same rule.
"""
if not spatial.r_absorbed_pairs:
return spatial.r_propagators
absorbed = set(spatial.r_absorbed_pairs)
return tuple(p for p in spatial.r_propagators if p not in absorbed)
def _select_C_batch(
C_batch: np.ndarray,
a: int | None,
b: int | None,
) -> np.ndarray:
"""Select a per-sample C component from either diagonal or full batches."""
C_arr = np.asarray(C_batch)
if C_arr.ndim == 3:
if a is not None and b is not None:
return C_arr[:, a, b]
return np.einsum("iaa->i", C_arr)
if C_arr.ndim != 2:
raise ValueError(
f"C batch must have shape (n, N) or (n, N, N); got {C_arr.shape}."
)
if a is not None and b is not None:
if a != b:
return np.zeros(C_arr.shape[0], dtype=C_arr.dtype)
return C_arr[:, a]
return C_arr.sum(axis=1)
def _topological_sort_times(
integration_vars: tuple[str, ...],
time_orderings: list[tuple[str, str]],
) -> list[str]:
"""Topological sort of integration time variables using causal ordering.
Returns variables ordered so that the *innermost* integration variable
(the one with the tightest bounds) comes first. This is the reverse
of the DAG order: leaves (earliest times) first.
"""
# Build adjacency: earlier → later
adj: dict[str, list[str]] = defaultdict(list)
in_degree: dict[str, int] = {v: 0 for v in integration_vars}
int_set = set(integration_vars)
for earlier, later in time_orderings:
if earlier in int_set and later in int_set:
adj[later].append(earlier)
in_degree[earlier] = in_degree.get(earlier, 0) + 1
# Kahn's algorithm — start from nodes with in_degree 0 (latest times)
queue = [v for v in integration_vars if in_degree[v] == 0]
result: list[str] = []
while queue:
queue.sort() # deterministic tie-breaking
node = queue.pop(0)
result.append(node)
for neighbor in adj.get(node, []):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# result is latest-first; reverse to get innermost (earliest) first
result.reverse()
return result
[docs]
def analyze_spatial(dt: "DiagramTerm") -> SpatialStructure:
"""Analyze a DiagramTerm's propagator topology.
Determines direction identification groups from R-propagator connectivity,
time ordering constraints from R causality, and which integration variables
survive after δ-function elimination.
"""
integration_set = set(dt.integration_vars)
# Equal-time non-local vertices share a single time integration
# variable across their m legs. Build the alias map (leg → canonical
# representative) and exclude the non-representatives from the
# surviving time-integration set so the Jacobian is one factor of
# ``width`` per vertex, not ``width^m``. Spatial labels remain
# independent (each leg keeps its own direction / position).
equal_time_aliases = dict(getattr(dt, "equal_time_aliases", ()) or ())
time_integration_set = set(integration_set)
for non_rep in equal_time_aliases:
time_integration_set.discard(non_rep)
# Classify propagators
r_props: list[tuple[str, str]] = []
c_props: list[tuple[str, str, str | None, str | None]] = []
for p in dt.propagators:
if p.kind == "R":
r_props.append((p.spatial_left, p.spatial_right))
else:
c_props.append((p.spatial_left, p.spatial_right, p.index_left, p.index_right))
# --- Direction groups via union-find on R propagators ---
all_points: set[str] = set()
for p in dt.propagators:
all_points.add(p.spatial_left)
all_points.add(p.spatial_right)
parent: dict[str, str] = {p: p for p in all_points}
def find(x: str) -> str:
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a: str, b: str) -> None:
ra, rb = find(a), find(b)
if ra != rb:
# Prefer external point as root
if rb not in integration_set and ra in integration_set:
parent[ra] = rb
else:
parent[rb] = ra
for sl, sr in r_props:
union(sl, sr)
# Build groups
groups_dict: dict[str, set[str]] = defaultdict(set)
for p in all_points:
groups_dict[find(p)].add(p)
direction_groups = tuple(frozenset(g) for g in groups_dict.values())
# Direction map: each point → n_{representative}
direction_map: dict[str, str] = {}
direction_integration_vars: list[str] = []
for group in direction_groups:
# Pick representative: prefer external point, then lexicographic
external = sorted(p for p in group if p not in integration_set)
if external:
rep = external[0]
else:
rep = sorted(group)[0]
direction_integration_vars.append(rep)
dir_name = f"n_{rep}"
for p in group:
direction_map[p] = dir_name
# --- Time orderings from R causality ---
# R(a, b): Θ(t_a − t_b) → t_b ≤ t_a → (earlier=b, later=a)
time_orderings: list[tuple[str, str]] = []
for sl, sr in r_props:
time_orderings.append((sr, sl)) # (earlier, later)
# --- Topological sort of integration time variables ---
# Only the canonical equal-time representatives (and any non-equal-time
# internals) appear in time_integration_set; the topo sort drops the
# filtered non-representatives implicitly because they're absent from
# the input list and from any (earlier, later) entry whose endpoint is
# filtered (rewritten below). Time-orderings that touch a filtered
# label are remapped to its canonical representative so causality
# between an equal-time vertex and the rest of the diagram remains
# captured by R-propagators on the surviving (representative) legs.
time_orderings_collapsed: list[tuple[str, str]] = []
for earlier, later in time_orderings:
earlier_alias = equal_time_aliases.get(earlier, earlier)
later_alias = equal_time_aliases.get(later, later)
if earlier_alias == later_alias:
# Same physical time after collapse — no ordering needed.
continue
time_orderings_collapsed.append((earlier_alias, later_alias))
sorted_time_vars = _topological_sort_times(
tuple(sorted(time_integration_set)), time_orderings_collapsed
)
# --- External points ---
external_points = tuple(sorted(p for p in all_points if p not in integration_set))
return SpatialStructure(
direction_groups=direction_groups,
direction_map=direction_map,
time_orderings=tuple(time_orderings_collapsed),
r_propagators=tuple(r_props),
c_propagators=tuple(c_props),
time_integration_vars=tuple(sorted_time_vars),
direction_integration_vars=tuple(sorted(direction_integration_vars)),
external_points=external_points,
equal_time_aliases=tuple(sorted(equal_time_aliases.items())),
r_absorbed_pairs=tuple(getattr(dt, "r_absorbed_pairs", ()) or ()),
)
# ---------------------------------------------------------------------------
# Step 3: Propagator Model and Cache
# ---------------------------------------------------------------------------
[docs]
@dataclass(frozen=True)
class PropagatorModel:
"""Physical propagator functions for numerical evaluation.
The R propagator has the form::
R_{ab}(n,t; n',t') = δ(n−n') Θ(t−t') R_time(t,t') δ_{ab} [iso_R]
R_{ab}(n,t; n',t') = δ(n−n') Θ(t−t') R_time_ab(t,t') [general]
The C propagator is computed from R and the 2-point cumulant κ^(2)::
C_{ab}(n₁,t₁; n₂,t₂) = ∫∫ R_time(t₁,λ') κ_{ab}(n₁,λ'; n₂,λ'')
R_time(t₂,λ'') dλ' dλ''
Attributes:
R_time: Callable ``(t_left, t_right) → scalar`` when ``iso_R=True``,
or ``(t_left, t_right) → (N, N) array`` otherwise.
kappa2: Callable ``(n1, t1, n2, t2) → (N, N) array``.
The 2-point cumulant.
n_components: Number of field components.
iso_R: Whether R is isotropic (proportional to identity).
diag_C: Whether C is diagonal in component indices.
t_min: Lower bound for the λ integrals in C computation.
"""
R_time: Callable
kappa2: Callable
n_components: int
iso_R: bool = True
diag_C: bool = False
t_min: float = 0.0
# Optional white-noise component of the source-field correlation:
#
# κ²(t1, x1; t2, x2) = kappa2_smooth + δ(t1 − t2) · sigma2(t1; x1, x2).
#
# ``sigma2`` takes a single time argument (the δ collapses both
# times) and returns an (N, N) matrix. When provided, ``C`` picks
# up an additional 1-D integral
#
# C_white(t1, t2; x1, x2) =
# ∫_{t_min}^{min(t1,t2)} R(t1, τ) σ²(τ; x1, x2) R(t2, τ) dτ,
#
# evaluated by :meth:`PropagatorCache._C_value_direct` and the
# :meth:`~PropagatorCache.C_value` dblquad fallback alongside the
# existing 2-D ``kappa2_smooth`` integral. All
# ``precompute_C_table_*`` builders call ``_C_value_direct``, so
# the white-noise term is automatically absorbed into every
# homogeneity-mode spline with no caller-side changes.
sigma2: Callable | None = None
class _LazyTimeSplineCache:
"""Per-parameter-value 2-D ``(t1, t2)`` spline cache for lazy
spatial evaluation.
Backs the lazy mode of :class:`PropagatorCache` for each
homogeneity symmetry: instead of pre-building an (n+1)-D spline
over a pre-allocated parameter grid, this cache builds a 2-D
time spline on demand for each distinct parameter value seen.
Parameter semantics per mode:
- ``translation``: key is ``r = |x1 − x2|`` (scalar ≥ 0).
- ``rotation``: key is ``cos θ = x1·x2 / (|x1| |x2|)`` (scalar
in ``[-1, 1]``).
- ``general``: key is ``(x1_tuple, x2_tuple)`` — a pair of
tuples over all x dimensions.
Memoization key is the parameter rounded to ``round_decimals``
decimal places so floating-point near-duplicates collapse.
"""
def __init__(
self,
parent: "PropagatorCache",
t_max: float,
n_grid_t: int,
mode: str,
round_decimals: int = 10,
n_jobs: int = 1,
):
self.parent = parent
self.t_max = t_max
self.n_grid_t = n_grid_t
self.mode = mode
self.round_decimals = round_decimals
self.ts = np.linspace(parent.model.t_min, t_max, n_grid_t)
self._splines_by_key: dict = {}
#: Parallel-worker count forwarded to :meth:`_build`. ``1``
#: (default) is serial; ``-1`` is all cores via
#: :mod:`joblib.Parallel` with the ``loky`` backend. Each
#: on-demand build for a new parameter value pays a one-time
#: worker-startup cost (~1 s) before the ``n_grid_t²``
#: independent ``_C_value_direct`` calls fan out across
#: cores.
self.n_jobs = n_jobs
def get_splines(self, x1_val, x2_val) -> list:
"""Return the list of per-component 2-D splines for this
(x1, x2) pair. Builds and memoizes on first call."""
key = self._make_key(x1_val, x2_val)
if key not in self._splines_by_key:
self._splines_by_key[key] = self._build(x1_val, x2_val)
return self._splines_by_key[key]
def _make_key(self, x1_val, x2_val):
"""Derive the memoization key. In translation/rotation mode
the key is derived from the parameter (r or cos); in general
mode it's the (x1, x2) tuple pair."""
if self.mode == "translation":
# Translation invariance: C depends only on ``||x1 - x2||``.
# Accept scalar or arbitrary-dimensional vector inputs;
# the cache shape stays (t1, t2, r) regardless of d.
diff = np.asarray(x1_val, dtype=float) - np.asarray(x2_val, dtype=float)
r = float(abs(diff)) if diff.ndim == 0 else float(np.linalg.norm(diff))
return ("r", round(r, self.round_decimals))
if self.mode == "rotation":
cos_val = _rotation_cos(x1_val, x2_val)
return ("cos", round(float(cos_val), self.round_decimals))
# general: keep both endpoints
a = tuple(np.round(np.atleast_1d(x1_val), self.round_decimals).tolist())
b = tuple(np.round(np.atleast_1d(x2_val), self.round_decimals).tolist())
return ("xx", a, b)
def _build(self, x1_val, x2_val) -> list:
"""Build 2-D ``(t1, t2)`` splines at this specific parameter
value by evaluating ``_C_value_direct`` on the time grid.
Parallelises across the ``n_grid_t × n_grid_t`` grid points
when :attr:`n_jobs` ≠ 1, mirroring the full-grid builders.
"""
from scipy.interpolate import RectBivariateSpline
parent = self.parent
N = parent.model.n_components
ts = self.ts
n_t = self.n_grid_t
tasks = [
(i, j, ts[i], ts[j])
for i in range(n_t)
for j in range(n_t)
]
x1_arr = np.asarray(x1_val)
x2_arr = np.asarray(x2_val)
def _point(args):
i, j, t1, t2 = args
C_mat = parent._C_value_direct(x1_arr, t1, x2_arr, t2)
return i, j, np.array([C_mat[a, a] for a in range(N)])
# Serial fast path; matches the full-grid builders' short-list
# threshold so small caches don't pay joblib startup.
if self.n_jobs == 1 or len(tasks) <= 4:
results = [_point(t) for t in tasks]
else:
from joblib import Parallel, delayed
results = Parallel(n_jobs=self.n_jobs, backend="loky")(
delayed(_point)(t) for t in tasks
)
grids = [np.zeros((n_t, n_t)) for _ in range(N)]
for i, j, cvec in results:
for a in range(N):
grids[a][i, j] = cvec[a]
return [RectBivariateSpline(ts, ts, grids[a]) for a in range(N)]
def _C_value_direct_gl(
model: "PropagatorModel",
n1: Any,
t1: float,
n2: Any,
t2: float,
n_gauss: int = 20,
) -> np.ndarray:
"""Gauss-Legendre evaluation of the C-propagator 2-D integral.
Drop-in replacement for the ``scipy.integrate.dblquad`` path inside
:meth:`PropagatorCache._C_value_direct` for the common case where
κ² is smooth except for a folded ``|λ1 − λ2|`` cusp on the diagonal
(the OU / exponential-temporal kernels used in demo1, demo2, and
the test suite).
Strategy: rather than apply a single tensor-product Gauss-Legendre
rule on the rectangle ``[t_min, t1] × [t_min, t2]`` -- where the
diagonal cusp ruins polynomial convergence and gives O(10%) error
even at ``n_gauss=30`` -- we split the rectangle at ``λ1 = λ2``
into 2 (square case) or 3 (rectangle case) sub-regions on each of
which the integrand is smooth, then apply a fixed
``n_gauss × n_gauss`` rule per sub-region.
Sub-regions for ``t_min ≤ λ1 ≤ t1, t_min ≤ λ2 ≤ t2``:
* Let ``t_d = min(t1, t2)``. In the square ``[t_min, t_d]²`` the
diagonal cuts through.
- Region A (lower triangle): ``λ2 ≤ λ1``, so ``|λ1−λ2|=λ1−λ2``.
- Region B (upper triangle): ``λ1 ≤ λ2``, so ``|λ1−λ2|=λ2−λ1``.
* Outside the square (only if ``t1 ≠ t2``):
- If ``t1 > t2``: Region C is the strip
``t_d ≤ λ1 ≤ t1, t_min ≤ λ2 ≤ t2`` where ``λ1 > λ2``.
- If ``t2 > t1``: Region C is the strip
``t_min ≤ λ1 ≤ t1, t_d ≤ λ2 ≤ t2`` where ``λ2 > λ1``.
Each sub-region is handled by an iterated 1-D Gauss-Legendre rule:
outer pass over ``λ1`` with ``n_gauss`` nodes, inner pass over
``λ2`` with ``n_gauss`` nodes whose interval depends on ``λ1`` for
the triangular regions.
For the triangular regions the inner integral has a node-dependent
upper or lower bound; we map the inner ``[-1, 1]`` GL nodes onto
that variable interval each time, accumulating the Jacobian.
The white-noise δ-correlated piece (when ``model.sigma2`` is set)
is added via a single 1-D Gauss-Legendre pass on
``[t_min, min(t1, t2)]`` -- the integrand there is smooth (no cusp).
Args:
model: The propagator model.
n1, t1, n2, t2: As in :meth:`PropagatorCache._C_value_direct`.
n_gauss: Number of GL nodes per dimension per sub-region.
Returns:
``(N, N)`` array of C values, identical in shape and meaning
to the dblquad path.
"""
from numpy.polynomial.legendre import leggauss
N = model.n_components
t_min = model.t_min
C_mat = np.zeros((N, N))
if t1 <= t_min or t2 <= t_min:
# Empty integration domain along at least one axis -- C = 0.
return C_mat
# 1-D GL nodes & weights on [-1, 1], reused across sub-regions.
nodes, weights = leggauss(n_gauss)
# ---- helper: evaluate R-diagonal at a single time pair ------- #
def _r_diag(t_obs: float, lam: float) -> np.ndarray:
"""Return shape ``(N,)`` -- the diagonal of ``R(t_obs, lam)``.
For ``iso_R`` the same scalar fills every component. For
``diag_C=True`` only diagonal entries are needed, so we
always return the diagonal slice. When ``diag_C=False`` we
still call this for entries ``[a, a]`` of R (R is always
diagonal in component indices in MSR), but the κ² off-diag
couples them.
"""
rt = model.R_time(t_obs, lam)
if model.iso_R:
return np.full(N, float(rt))
rt_arr = np.asarray(rt, dtype=float)
return np.array([rt_arr[a, a] for a in range(N)])
# ---- helper: evaluate κ² at a single time pair --------------- #
def _kappa(lam1: float, lam2: float) -> np.ndarray:
return np.asarray(model.kappa2(n1, lam1, n2, lam2), dtype=float)
# ---- helper: contract one (R_diag(t1,lam1), kappa, R_diag(t2,lam2))
# triple into the (N, N) per-point integrand value ------------- #
def _integrand_block(
r1d: np.ndarray, kmat: np.ndarray, r2d: np.ndarray,
) -> np.ndarray:
# element [a, b] = r1d[a] * kmat[a, b] * r2d[b]
if model.diag_C:
# Only diagonal matters; mask kmat to its diagonal.
out = np.zeros((N, N))
diag = np.array([kmat[a, a] for a in range(N)])
for a in range(N):
out[a, a] = r1d[a] * diag[a] * r2d[a]
return out
return (r1d[:, None] * kmat) * r2d[None, :]
# ---- helper: integrate over a triangular region -------------- #
# Region of the form a_lo ≤ λ1 ≤ a_hi, inner_lo(λ1) ≤ λ2 ≤ inner_hi(λ1).
# Both ``a_lo, a_hi`` are scalars; ``inner_lo`` and ``inner_hi``
# are callables of λ1.
def _gl_region(
lam1_lo: float, lam1_hi: float,
inner_lo: Callable[[float], float],
inner_hi: Callable[[float], float],
) -> np.ndarray:
if lam1_hi <= lam1_lo:
return np.zeros((N, N))
# Outer mapping: λ1 = lam1_lo + 0.5*(lam1_hi-lam1_lo)*(node+1)
half_outer = 0.5 * (lam1_hi - lam1_lo)
mid_outer = 0.5 * (lam1_hi + lam1_lo)
lam1_pts = mid_outer + half_outer * nodes
outer_w = half_outer * weights
acc = np.zeros((N, N))
for k, lam1 in enumerate(lam1_pts):
lo = inner_lo(float(lam1))
hi = inner_hi(float(lam1))
if hi <= lo:
continue
half_inner = 0.5 * (hi - lo)
mid_inner = 0.5 * (hi + lo)
lam2_pts = mid_inner + half_inner * nodes
inner_w = half_inner * weights
r1d = _r_diag(t1, float(lam1))
inner_acc = np.zeros((N, N))
for j, lam2 in enumerate(lam2_pts):
kmat = _kappa(float(lam1), float(lam2))
r2d = _r_diag(t2, float(lam2))
inner_acc += inner_w[j] * _integrand_block(r1d, kmat, r2d)
acc += outer_w[k] * inner_acc
return acc
t_d = min(t1, t2)
# Region A: t_min ≤ λ2 ≤ λ1 ≤ t_d (lower triangle, λ1 > λ2).
if t_d > t_min:
C_mat += _gl_region(
t_min, t_d,
inner_lo=lambda _l1: t_min,
inner_hi=lambda l1: l1,
)
# Region B: t_min ≤ λ1 ≤ λ2 ≤ t_d (upper triangle, λ1 < λ2).
C_mat += _gl_region(
t_min, t_d,
inner_lo=lambda l1: l1,
inner_hi=lambda _l1: t_d,
)
# Region C: outside the square (only if t1 != t2).
if t1 > t2:
# λ1 ∈ [t_d, t1], λ2 ∈ [t_min, t2] = [t_min, t_d]. λ1 > λ2.
C_mat += _gl_region(
t_d, t1,
inner_lo=lambda _l1: t_min,
inner_hi=lambda _l1: t_d,
)
elif t2 > t1:
# λ2 ∈ [t_d, t2], λ1 ∈ [t_min, t1] = [t_min, t_d]. λ2 > λ1.
C_mat += _gl_region(
t_min, t_d,
inner_lo=lambda _l1: t_d,
inner_hi=lambda _l1: t2,
)
# ---- White-noise δ piece: 1-D GL on [t_min, min(t1, t2)] ----- #
if model.sigma2 is not None:
t_upper = t_d
if t_upper > t_min:
half = 0.5 * (t_upper - t_min)
mid = 0.5 * (t_upper + t_min)
tau_pts = mid + half * nodes
tau_w = half * weights
for k, tau in enumerate(tau_pts):
tau_f = float(tau)
r1d = _r_diag(t1, tau_f)
r2d = _r_diag(t2, tau_f)
sig = np.asarray(model.sigma2(n1, tau_f, n2), dtype=float)
C_mat += tau_w[k] * _integrand_block(r1d, sig, r2d)
return C_mat
def _rotation_cos(x1, x2) -> float:
"""Cosine similarity ``x1·x2 / (|x1| |x2|)`` as a scalar.
Works for scalar x (interpreted as 1-D vector) and array x.
Returns ``1.0`` when either vector is exactly zero (degenerate;
avoids a divide-by-zero NaN).
"""
v1 = np.atleast_1d(np.asarray(x1, dtype=float))
v2 = np.atleast_1d(np.asarray(x2, dtype=float))
n1 = float(np.linalg.norm(v1))
n2 = float(np.linalg.norm(v2))
if n1 == 0.0 or n2 == 0.0:
return 1.0
return float(np.dot(v1, v2) / (n1 * n2))
def _resolve_integrate_over(
integrate_over: Any,
ext_vars: list[str],
) -> set[str]:
"""Normalise the ``integrate_over`` kwarg into a set of external
point names to integrate over.
Accepts:
- ``None`` — the empty set (all externals fixed at ``lambda_f``).
This is the physics-observable convention
``⟨φ(t_f) · φ(t_f)⟩``.
- ``"all"`` — the full set ``ext_vars`` (time-integrated moment).
- Iterable of names — used as-is, after validating that each
name appears in ``ext_vars``.
Raises ``ValueError`` on an unknown external name.
"""
if integrate_over is None:
return set()
if isinstance(integrate_over, str):
if integrate_over == "all":
return set(ext_vars)
raise ValueError(
f"integrate_over must be None, 'all', or an iterable of "
f"external-point names; got string {integrate_over!r}."
)
requested = set(integrate_over)
unknown = requested - set(ext_vars)
if unknown:
raise ValueError(
f"integrate_over references external points that don't "
f"exist in this diagram: {sorted(unknown)}. Known "
f"externals: {ext_vars}."
)
return requested
[docs]
class PropagatorCache:
"""Evaluates and caches propagator values.
Computes C via double integration of ``R · κ · R`` and caches results
to avoid redundant quadrature.
Args:
model: The physical propagator model.
quad_opts: Options passed to ``scipy.integrate.dblquad``
(e.g. ``{'epsabs': 1e-8, 'epsrel': 1e-8}``).
"""
def __init__(
self,
model: PropagatorModel,
quad_opts: dict | None = None,
homogeneity: str = "translation",
interp_method: str = "linear",
c_method: str = "dblquad",
n_gauss: int = 20,
):
"""Initialise the propagator cache.
Args:
model: Propagator model specifying R and κ².
quad_opts: Options forwarded to ``scipy.integrate.dblquad``.
c_method: Quadrature method used by :meth:`_C_value_direct`
(and therefore by every ``precompute_C_table_*`` builder).
- ``'dblquad'`` (default, robust): adaptive 2-D
quadrature via :func:`scipy.integrate.dblquad`.
Handles arbitrary κ² but slow (10-80 ms per call)
due to adaptive recursion.
- ``'gauss_legendre'``: tensor-product Gauss-Legendre
with diagonal-aware sub-region splitting at
``λ1 = λ2``. ~10-1000× faster than ``'dblquad'``
on smooth κ² with a single ``|λ1−λ2|`` cusp on the
diagonal (the demo1/demo2 OU-style kernels). See
:func:`_C_value_direct_gl` for details.
Each ``precompute_C_table_*`` builder also accepts a
local ``c_method`` kwarg that overrides this default
for the duration of the build.
n_gauss: Number of Gauss-Legendre nodes per dimension per
sub-region when ``c_method='gauss_legendre'``. Default
``20`` is enough for machine precision on smooth OU /
Gaussian kernels. Cost scales as ``n_gauss²`` per
sub-region (3 sub-regions for ``t1 ≠ t2``, 2 for
``t1 = t2``).
homogeneity: Physical assumption about how C depends on the
spatial coordinates. Determines which
``precompute_C_table_*`` builder is valid.
- ``'translation'`` (default): C depends only on
``|x1 − x2|``. Appropriate for translation-invariant
(spatially stationary) noise — the most common case.
Build via :meth:`precompute_C_table_translation`.
- ``'rotation'``: C depends only on ``x1 · x2``.
Appropriate when x is a unit direction vector (e.g.
on the sphere). Build via
:meth:`precompute_C_table_rotation`.
- ``'general'``: no symmetry assumed; C is evaluated on
a full 4-D grid in ``(t1, t2, x1, x2)``. Build via
:meth:`precompute_C_table_general`.
When a spatial builder is called with its ``*_grid``
argument left ``None``, the cache enters **lazy mode**
for that symmetry: 2-D ``(t1, t2)`` splines are built
on-demand for each distinct parameter value queried,
and memoized. This is the right default for
moment-at-fixed-points workflows where only a few
distinct parameter values ever appear.
interp_method: ``RegularGridInterpolator`` method used by
the three full-grid spline builders
(``precompute_C_table_translation``,
``precompute_C_table_rotation``,
``precompute_C_table_general``).
- ``'linear'`` (default, safe): O(h²) accuracy,
monotone -- never overshoots / flips sign even on
steeply-decaying C tails. Required when C spans
many orders of magnitude across the r-grid (the
typical cosmological setting; see
``tests/test_evaluate_interpolation_accuracy.py``).
- ``'cubic'``: O(h⁴) accuracy on smooth, well-sampled
grids -- only safe when the user knows their C is
bounded away from grid-induced sign-flip artefacts.
``RegularGridInterpolator`` accepts any other method
name supported by the installed scipy (e.g.
``'quintic'``, ``'pchip'``, ``'nearest'``); they are
forwarded as-is and validated lazily on table build.
"""
if homogeneity not in ("translation", "rotation", "general"):
raise ValueError(
f"homogeneity must be 'translation', 'rotation', or "
f"'general'; got {homogeneity!r}"
)
if c_method not in ("dblquad", "gauss_legendre"):
raise ValueError(
f"c_method must be 'dblquad' or 'gauss_legendre'; "
f"got {c_method!r}"
)
if int(n_gauss) < 2:
raise ValueError(
f"n_gauss must be >= 2; got {n_gauss!r}"
)
self.model = model
self.quad_opts = quad_opts or {}
self.homogeneity = homogeneity
self.interp_method = interp_method
self.c_method = c_method
self.n_gauss = int(n_gauss)
# Closed-form-only path: skip every spline and route C lookups
# directly through ``self._C_value_direct``. Set by
# ``Propagators.build()`` when both ``c_closed_form`` and
# ``c_closed_form_only=True`` are passed; the user's analytical
# C function then becomes the lookup function with no
# interpolation error. ``closed_form_vectorized`` flags whether
# the user's c_fn accepts batched (n,)-shape t and (n,)- or
# broadcast-scalar n inputs and returns ``(n, N, N)``; if not,
# the per-sample fallback is used (slow, but correct).
self._closed_form_only: bool = False
self._closed_form_vectorized: bool = False
self._c_cache: dict[tuple, np.ndarray] = {}
# Legacy 2-D (t1, t2) spline at fixed x — still populated by
# ``precompute_C_table`` for backward compatibility.
self._c_splines: list | None = None
self._c_table_range: tuple[float, float] | None = None
# Full N-D spatial splines, populated by precompute_C_table_*
# with an explicit parameter grid:
self._c_translation_splines: list | None = None # 3-D (t1,t2,r)
self._c_translation_r_range: tuple[float, float] | None = None
self._c_rotation_splines: list | None = None # 3-D (t1,t2,cos)
self._c_general_interpolators: list | None = None # 4-D
self._c_general_axes: tuple | None = None
# Lazy per-parameter-value 2-D (t1, t2) spline caches. A
# :class:`_LazyTimeSplineCache` is attached when the user
# calls a ``precompute_C_table_*`` method without a parameter
# grid (see ``lazy mode`` in the docstring). Keyed by
# rounded parameter value (r, cos, or (x1, x2)).
self._lazy_translation: _LazyTimeSplineCache | None = None
self._lazy_rotation: _LazyTimeSplineCache | None = None
self._lazy_general: _LazyTimeSplineCache | None = None
[docs]
def R_time(self, t_left: float, t_right: float) -> float | np.ndarray:
"""Evaluate R_time(t_left, t_right).
Returns scalar if ``model.iso_R=True``, else ``(N, N)`` array.
The Θ function is NOT enforced here — the integration domain
handles causality.
"""
return self.model.R_time(t_left, t_right)
[docs]
def R_product(
self,
r_pairs: tuple[tuple[str, str], ...],
times: dict[str, float],
) -> float:
"""Product of R_time over all R propagator pairs.
Only valid when ``model.iso_R=True`` (R is scalar).
"""
result = 1.0
for sl, sr in r_pairs:
result *= float(self.R_time(times[sl], times[sr]))
return result
[docs]
def C_value(
self,
n1: Any,
t1: float,
n2: Any,
t2: float,
) -> np.ndarray:
"""Compute the full C matrix ``C_{ab}(n1, t1; n2, t2)``.
If :meth:`precompute_C_table` has been called and ``t1``, ``t2``
are within the table range, uses fast spline interpolation.
Otherwise falls back to ``scipy.integrate.dblquad``::
C_{ab} = ∫_{t_min}^{t1} dλ' ∫_{t_min}^{t2} dλ''
R_time(t1,λ') κ_{ab}(n1,λ'; n2,λ'') R_time(t2,λ'')
Returns:
``(N, N)`` array.
"""
# Fast path (spatial-aware): route through C_at_batch which
# dispatches on ``homogeneity`` + full/lazy table presence.
# Only used when a spatial table has actually been built —
# otherwise fall through to the legacy/dblquad paths so the
# plain ``PropagatorCache(model)`` + legacy ``precompute_C_table``
# flow stays bit-identical to before.
has_spatial_table = (
self._c_translation_splines is not None
or self._lazy_translation is not None
or self._c_rotation_splines is not None
or self._lazy_rotation is not None
or self._c_general_interpolators is not None
or self._lazy_general is not None
)
if has_spatial_table and self.model.diag_C:
N = self.model.n_components
t1_arr = np.array([t1], dtype=float)
t2_arr = np.array([t2], dtype=float)
# n1, n2 may be scalars or arrays (e.g. np.float64 or
# small np.ndarray); pass through np.asarray so the
# downstream rotation/general paths see consistent shape.
x1_arr = np.asarray(n1, dtype=float).reshape(1, -1) \
if np.ndim(n1) > 0 else np.array([float(n1)])
x2_arr = np.asarray(n2, dtype=float).reshape(1, -1) \
if np.ndim(n2) > 0 else np.array([float(n2)])
if x1_arr.ndim == 2 and x1_arr.shape[-1] == 1:
x1_arr = x1_arr.ravel()
x2_arr = x2_arr.ravel()
c_diag = self.C_at_batch(t1_arr, t2_arr, x1_arr, x2_arr)[0]
C_mat = np.zeros((N, N))
for a in range(N):
C_mat[a, a] = c_diag[a]
return C_mat
# Fast path (legacy time-only): 2-D spline table
if self._c_splines is not None and self.model.diag_C:
lo, hi = self._c_table_range # type: ignore[misc]
if lo <= t1 <= hi and lo <= t2 <= hi:
return self._C_value_from_table(t1, t2)
# Check (n1, t1, n2, t2) LRU cache
def _cache_key_part(obj):
if isinstance(obj, np.ndarray):
return ("ndarray", id(obj))
if isinstance(obj, list):
return tuple(_cache_key_part(v) for v in obj)
if isinstance(obj, tuple):
return tuple(_cache_key_part(v) for v in obj)
return obj
cache_key = (_cache_key_part(n1), t1, _cache_key_part(n2), t2)
if cache_key in self._c_cache:
return self._c_cache[cache_key]
# Delegate to ``_C_value_direct`` (which adds the white-noise
# 1-D contribution when ``model.sigma2`` is set), then store
# in the per-arg LRU.
C_mat = self._C_value_direct(n1, t1, n2, t2)
self._c_cache[cache_key] = C_mat
return C_mat
[docs]
def C_diagonal(
self,
n: Any,
t1: float,
n_prime: Any | None = None,
t2: float | None = None,
) -> np.ndarray:
"""Return diagonal ``[C_{00}, C_{11}, …]`` as a 1D array.
If ``n_prime`` and ``t2`` are None, uses equal-point ``C(n,t; n,t)``.
Uses spline table when available for fast lookup.
"""
if n_prime is None:
n_prime = n
if t2 is None:
t2 = t1
# Fast path: spline table
if self._c_splines is not None:
lo, hi = self._c_table_range # type: ignore[misc]
if lo <= t1 <= hi and lo <= t2 <= hi:
return self._C_diagonal_from_table(t1, t2)
C_mat = self.C_value(n, t1, n_prime, t2)
return np.diag(C_mat)
[docs]
def clear_cache(self) -> None:
"""Clear the C value cache and spline table."""
self._c_cache.clear()
self._c_splines = None
self._c_table_range = None
[docs]
def precompute_C_table(
self,
t_max: float,
n_grid: int = 100,
direction: Any = 0,
) -> None:
"""Pre-compute C on a grid and build spline interpolators.
For ``iso_R + diag_C`` with direction-independent ``kappa2``,
``C_{aa}(t1, t2)`` depends only on ``(t1, t2)``. This method
evaluates C on an ``n_grid × n_grid`` grid via ``dblquad``,
then builds a :class:`~scipy.interpolate.RectBivariateSpline`
for each diagonal component.
After calling this, :meth:`C_value` and :meth:`C_diagonal`
use fast spline interpolation instead of ``dblquad``.
Args:
t_max: Upper bound of the time grid.
n_grid: Number of grid points per axis (default 100).
direction: Direction value to use for kappa2 evaluation.
"""
from scipy.interpolate import RectBivariateSpline
m = self.model
N = m.n_components
t_min = m.t_min
ts = np.linspace(t_min, t_max, n_grid)
# Build grid for each diagonal component
grids = [np.zeros((n_grid, n_grid)) for _ in range(N)]
for i in range(n_grid):
for j in range(n_grid):
C_mat = self.C_value(direction, ts[i], direction, ts[j])
for a in range(N):
grids[a][i, j] = C_mat[a, a]
self._c_splines = [
RectBivariateSpline(ts, ts, grids[a])
for a in range(N)
]
self._c_table_range = (t_min, t_max)
@property
def has_table(self) -> bool:
"""Whether a pre-computed C table is available."""
return self._c_splines is not None
def _C_diagonal_from_table(self, t1: float, t2: float) -> np.ndarray:
"""Look up C diagonal from spline table."""
return np.array([
float(s(t1, t2, grid=False)) for s in self._c_splines # type: ignore[union-attr]
])
def _C_value_from_table(self, t1: float, t2: float) -> np.ndarray:
"""Look up C matrix from spline table (diagonal only)."""
N = self.model.n_components
C_mat = np.zeros((N, N))
for a, s in enumerate(self._c_splines): # type: ignore[union-attr]
C_mat[a, a] = float(s(t1, t2, grid=False))
return C_mat
# --- Vectorized methods for batch evaluation ---
[docs]
def C_diagonal_batch(
self, t1: np.ndarray, t2: np.ndarray,
) -> np.ndarray:
"""Evaluate diagonal C at arrays of time pairs.
Requires :meth:`precompute_C_table` to have been called.
Args:
t1: Array of shape ``(n,)`` — first time coordinates.
t2: Array of shape ``(n,)`` — second time coordinates.
Returns:
Array of shape ``(n, N)`` where ``N`` is the number of
field components. ``result[s, a]`` is ``C_{aa}(t1[s], t2[s])``.
"""
if self._c_splines is None:
raise RuntimeError(
"C_diagonal_batch requires precompute_C_table()"
)
n = len(t1)
N = self.model.n_components
result = np.empty((n, N))
for a, s in enumerate(self._c_splines):
result[:, a] = s(t1, t2, grid=False)
return result
[docs]
def R_time_batch(self, t1: np.ndarray, t2: np.ndarray) -> np.ndarray:
"""Evaluate R_time at arrays of time pairs (iso_R only).
Args:
t1: Array of shape ``(n,)`` — left times.
t2: Array of shape ``(n,)`` — right times.
Returns:
Array of shape ``(n,)``. R(t1, t2) = Θ(t1−t2) × R_time(t1, t2).
"""
# Vectorize the user-provided R_time function
R_vec = np.vectorize(self.model.R_time)
return R_vec(t1, t2)
# ------------------------------------------------------------------ #
# Spatial-aware extension: (t, x) coordinates via homogeneity modes
# ------------------------------------------------------------------ #
[docs]
def precompute_C_table_translation(
self,
t_max: float,
n_grid_t: int = 60,
r_max: float | None = None,
n_grid_r: int | None = None,
n_jobs: int = 1,
c_method: str = "dblquad",
n_gauss: int = 20,
) -> None:
"""Enable spatial support under the translation-invariance
assumption:
``C(t1, t2; x1, x2)`` depends only on ``|x1 − x2|``.
Two operating modes depending on whether ``r_max`` and
``n_grid_r`` are supplied:
**Lazy mode (default, recommended for fixed-point moments)**
``r_max=None`` and ``n_grid_r=None``. No r-grid is
pre-computed. When :meth:`C_at_batch` encounters a new
``r = |x1−x2|`` value it builds and caches a 2-D
``(t1, t2)`` spline on demand. For a moment calculation
at fixed external points ``(x_1, …, x_n)`` there are at
most ``O(n²)`` distinct r-values, so lazy mode saves an
enormous amount of work vs a densely-sampled r-grid —
particularly when the underlying ``_C_value_direct`` is
``scipy.integrate.dblquad`` (expensive).
**Full-grid mode (right for sweeping r curves)**
Both ``r_max`` and ``n_grid_r`` provided. A 3-D
``(t1, t2, r)`` spline is built up-front over the full
range, giving O(1) evaluation per query — faster once
``n_distinct_r >> n_grid_r``. Appropriate when the user
will plot ``C`` or a derived observable as a function of
r over a continuous range.
Args:
t_max: upper bound of the (t1, t2) time grid.
n_grid_t: grid size along each time axis.
r_max: upper bound of the r = ``|x1-x2|`` grid
(full-grid mode). ``None`` ⇒ lazy mode.
n_grid_r: grid size along r (full-grid mode).
``None`` ⇒ lazy mode.
n_jobs: parallel workers for grid-point ``_C_value_direct``
evaluations. ``1`` serial, ``-1`` all cores via
:mod:`joblib` (``loky`` backend). Applies in **both**
full-grid mode (fans out across the 3-D ``(t1, t2, r)``
grid) **and** lazy mode (fans out across each on-demand
2-D ``(t1, t2)`` build; each new r pays a ~1 s worker
startup cost).
c_method: Quadrature method passed to
:meth:`_C_value_direct` for every grid-point
evaluation (full-grid mode only -- lazy mode will use
the cache instance's :attr:`c_method` default).
- ``'dblquad'`` (default): backwards-compatible
adaptive quadrature.
- ``'gauss_legendre'``: tensor-product GL with
diagonal-aware sub-region splitting. 10-1000×
faster on smooth κ² with a single ``|λ1−λ2|`` cusp.
See :func:`_C_value_direct_gl`.
n_gauss: Per-dimension GL node count when
``c_method='gauss_legendre'``. Default ``20`` is
enough for machine precision on demo1/demo2 OU
kernels.
Requires ``self.homogeneity == 'translation'``.
Side effect: sets either ``_c_translation_splines`` (full)
or ``_lazy_translation`` (lazy).
"""
if self.homogeneity != "translation":
raise RuntimeError(
f"precompute_C_table_translation requires "
f"homogeneity='translation', got {self.homogeneity!r}"
)
m = self.model
t_min = m.t_min
self._c_table_range = (t_min, t_max)
# Lazy mode: defer per-r spline construction to first query.
if r_max is None or n_grid_r is None:
self._lazy_translation = _LazyTimeSplineCache(
self, t_max, n_grid_t, mode="translation",
n_jobs=n_jobs,
)
return
# Full-grid mode: build the 3-D (t1, t2, r) spline now.
from scipy.interpolate import RegularGridInterpolator
N = m.n_components
ts = np.linspace(t_min, t_max, n_grid_t)
rs = np.linspace(0.0, r_max, n_grid_r)
tasks = [
(i, j, k, ts[i], ts[j], rs[k])
for i in range(n_grid_t)
for j in range(n_grid_t)
for k in range(n_grid_r)
]
def _point(args):
i, j, k, t1, t2, r = args
# TODO(d-dim): replace scalar r with np.ndarray x_diff
# Forward the GL kwargs only when explicitly requested, so
# subclasses overriding ``_C_value_direct`` with the
# legacy 4-arg signature continue to work unchanged.
if c_method != "dblquad":
C_mat = self._C_value_direct(
np.asarray(0.0), t1, np.asarray(r), t2,
method=c_method, n_gauss=n_gauss,
)
else:
C_mat = self._C_value_direct(
np.asarray(0.0), t1, np.asarray(r), t2,
)
return i, j, k, np.array([C_mat[a, a] for a in range(N)])
if n_jobs == 1 or len(tasks) <= 4:
results = [_point(t) for t in tasks]
else:
from joblib import Parallel, delayed
results = Parallel(n_jobs=n_jobs, backend="loky")(
delayed(_point)(t) for t in tasks
)
grids = [np.zeros((n_grid_t, n_grid_t, n_grid_r)) for _ in range(N)]
for i, j, k, cvec in results:
for a in range(N):
grids[a][i, j, k] = cvec[a]
self._c_translation_splines = [
RegularGridInterpolator(
(ts, ts, rs), grids[a],
bounds_error=False, fill_value=None,
# Default 'linear' (set in PropagatorCache.__init__):
# tensor-product cubic on a steeply decaying C produces
# sign flips and O(1) overshoot in the tail; linear is
# monotone and safe. Users with smooth C and dense
# grids can opt into cubic via interp_method='cubic'.
# See tests/test_evaluate_interpolation_accuracy.py.
method=self.interp_method,
)
for a in range(N)
]
self._c_translation_r_range = (0.0, r_max)
[docs]
def precompute_C_table_rotation(
self,
t_max: float,
n_grid_t: int = 60,
n_grid_cos: int | None = None,
n_jobs: int = 1,
c_method: str = "dblquad",
n_gauss: int = 20,
) -> None:
"""Enable spatial support under the rotation-invariance
assumption:
``C(t1, t2; x1, x2)`` depends only on ``x1 · x2``.
Appropriate when ``x`` is a unit direction vector (e.g. on
the sphere). The effective spatial parameter is
``cos θ ∈ [−1, 1]``.
Two modes, analogous to
:meth:`precompute_C_table_translation`:
- Lazy (``n_grid_cos=None``): 2-D ``(t1, t2)`` splines built
on demand per distinct ``cos θ`` value. Best when only
a few ``cos θ`` values arise (e.g. a fixed-set moment
calculation).
- Full-grid (``n_grid_cos`` integer): 3-D
``(t1, t2, cos)`` spline over the whole ``cos ∈ [−1, 1]``
range. Best for angular-correlation sweeps.
Under the hood, the reference evaluation at a specific
``cos θ`` uses two representative unit vectors
``e_1 = (1, 0, …)`` and ``e_2`` chosen so that
``e_1 · e_2 = cos θ`` (a 2-D rotation of ``e_1``).
Args:
n_jobs: parallel workers for grid-point
``_C_value_direct`` evaluations; applies in **both**
full-grid (fans out across the 3-D ``(t1, t2, cos)``
grid) and lazy mode (fans out across each on-demand
2-D build). ``1`` serial, ``-1`` all cores via
:mod:`joblib`.
c_method: ``'dblquad'`` (default) or ``'gauss_legendre'``
-- forwarded to :meth:`_C_value_direct` for every
full-grid point. See
:meth:`precompute_C_table_translation` for details.
n_gauss: Per-dim GL node count for
``c_method='gauss_legendre'``; default ``20``.
Requires ``self.homogeneity == 'rotation'``.
"""
if self.homogeneity != "rotation":
raise RuntimeError(
f"precompute_C_table_rotation requires "
f"homogeneity='rotation', got {self.homogeneity!r}"
)
m = self.model
t_min = m.t_min
self._c_table_range = (t_min, t_max)
if n_grid_cos is None:
self._lazy_rotation = _LazyTimeSplineCache(
self, t_max, n_grid_t, mode="rotation",
n_jobs=n_jobs,
)
return
from scipy.interpolate import RegularGridInterpolator
N = m.n_components
ts = np.linspace(t_min, t_max, n_grid_t)
coses = np.linspace(-1.0, 1.0, n_grid_cos)
def _rep_vectors(cos_val):
"""Return a pair of unit vectors with dot product cos_val.
We use the 2-D representation ``e_1 = (1, 0)`` and
``e_2 = (cos, sin)`` where ``sin = √(1 − cos²)``. This
is sufficient when ``kappa2`` depends only on ``x1·x2``
(rotation invariance makes the ambient dimension
irrelevant).
"""
c = float(cos_val)
c = max(-1.0, min(1.0, c))
s = float(np.sqrt(max(0.0, 1.0 - c * c)))
return np.array([1.0, 0.0]), np.array([c, s])
tasks = [
(i, j, k, ts[i], ts[j], coses[k])
for i in range(n_grid_t)
for j in range(n_grid_t)
for k in range(n_grid_cos)
]
def _point(args):
i, j, k, t1, t2, cos_val = args
e1, e2 = _rep_vectors(cos_val)
# See ``precompute_C_table_translation._point`` for the
# legacy-subclass-compat rationale.
if c_method != "dblquad":
C_mat = self._C_value_direct(
e1, t1, e2, t2,
method=c_method, n_gauss=n_gauss,
)
else:
C_mat = self._C_value_direct(e1, t1, e2, t2)
return i, j, k, np.array([C_mat[a, a] for a in range(N)])
if n_jobs == 1 or len(tasks) <= 4:
results = [_point(t) for t in tasks]
else:
from joblib import Parallel, delayed
results = Parallel(n_jobs=n_jobs, backend="loky")(
delayed(_point)(t) for t in tasks
)
grids = [np.zeros((n_grid_t, n_grid_t, n_grid_cos)) for _ in range(N)]
for i, j, k, cvec in results:
for a in range(N):
grids[a][i, j, k] = cvec[a]
self._c_rotation_splines = [
RegularGridInterpolator(
(ts, ts, coses), grids[a],
bounds_error=False, fill_value=None,
# Default 'linear' (set in PropagatorCache.__init__):
# tensor-product cubic on a steeply decaying C produces
# sign flips and O(1) overshoot in the tail; linear is
# monotone and safe. Users with smooth C and dense
# grids can opt into cubic via interp_method='cubic'.
# See tests/test_evaluate_interpolation_accuracy.py.
method=self.interp_method,
)
for a in range(N)
]
[docs]
def precompute_C_table_general(
self,
t_max: float,
n_grid_t: int = 40,
x_max: float | None = None,
n_grid_x: int | None = None,
n_jobs: int = 1,
c_method: str = "dblquad",
n_gauss: int = 20,
) -> None:
"""Pre-compute C with no spatial symmetry assumed.
Two modes:
- Lazy (``x_max=None`` or ``n_grid_x=None``): 2-D
``(t1, t2)`` splines built on demand per distinct
``(x1, x2)`` pair. Best for fixed-point moments where
only a handful of pairs occur.
- Full-grid: 4-D ``(t1, t2, x1, x2)`` interpolator over the
pre-allocated grid. Build cost is
``n_grid_t² · n_grid_x²`` independent ``_C_value_direct``
calls — strongly consider ``n_jobs=-1``.
Args:
n_jobs: parallel workers for ``_C_value_direct``
evaluations; applies in **both** full-grid (fans out
across the 4-D grid) and lazy mode (fans out across
each on-demand 2-D build). ``1`` serial, ``-1`` all
cores via :mod:`joblib`.
c_method: ``'dblquad'`` (default) or ``'gauss_legendre'``
-- forwarded to :meth:`_C_value_direct` for every
full-grid point. See
:meth:`precompute_C_table_translation` for details.
n_gauss: Per-dim GL node count for
``c_method='gauss_legendre'``; default ``20``.
Requires ``self.homogeneity == 'general'``.
"""
if self.homogeneity != "general":
raise RuntimeError(
f"precompute_C_table_general requires "
f"homogeneity='general', got {self.homogeneity!r}"
)
m = self.model
t_min = m.t_min
self._c_table_range = (t_min, t_max)
if x_max is None or n_grid_x is None:
self._lazy_general = _LazyTimeSplineCache(
self, t_max, n_grid_t, mode="general",
n_jobs=n_jobs,
)
return
from scipy.interpolate import RegularGridInterpolator
N = m.n_components
ts = np.linspace(t_min, t_max, n_grid_t)
xs = np.linspace(-x_max, x_max, n_grid_x)
tasks = [
(i, j, p, q, ts[i], ts[j], xs[p], xs[q])
for i in range(n_grid_t)
for j in range(n_grid_t)
for p in range(n_grid_x)
for q in range(n_grid_x)
]
def _point(args):
i, j, p, q, t1, t2, x1, x2 = args
# TODO(d-dim): x1, x2 would be vectors
# See ``precompute_C_table_translation._point`` for the
# legacy-subclass-compat rationale.
if c_method != "dblquad":
C_mat = self._C_value_direct(
np.asarray(x1), t1, np.asarray(x2), t2,
method=c_method, n_gauss=n_gauss,
)
else:
C_mat = self._C_value_direct(
np.asarray(x1), t1, np.asarray(x2), t2,
)
return i, j, p, q, np.array([C_mat[a, a] for a in range(N)])
if n_jobs == 1 or len(tasks) <= 4:
results = [_point(t) for t in tasks]
else:
from joblib import Parallel, delayed
results = Parallel(n_jobs=n_jobs, backend="loky")(
delayed(_point)(t) for t in tasks
)
grids = [
np.zeros((n_grid_t, n_grid_t, n_grid_x, n_grid_x))
for _ in range(N)
]
for i, j, p, q, cvec in results:
for a in range(N):
grids[a][i, j, p, q] = cvec[a]
self._c_general_interpolators = [
RegularGridInterpolator(
(ts, ts, xs, xs), grids[a],
bounds_error=False, fill_value=None,
# Default 'linear' (set in PropagatorCache.__init__):
# tensor-product cubic on a steeply decaying C produces
# sign flips and O(1) overshoot in the tail; linear is
# monotone and safe. Users with smooth C and dense
# grids can opt into cubic via interp_method='cubic'.
# See tests/test_evaluate_interpolation_accuracy.py.
method=self.interp_method,
)
for a in range(N)
]
self._c_general_axes = (ts, ts, xs, xs)
def _C_value_direct(
self,
n1: Any,
t1: float,
n2: Any,
t2: float,
method: str | None = None,
n_gauss: int | None = None,
) -> np.ndarray:
"""Direct quadrature evaluation of C without consulting any cache.
Needed so the ``precompute_C_table_{translation,rotation,general}``
builders aren't contaminated by the spline cache they are
themselves populating.
When ``model.sigma2`` is provided, the full κ² is
``κ²_smooth + δ(t1−t2) · σ²(t; x1, x2)``. The δ collapses one
time integral so C picks up an extra 1-D piece::
C_white[a,b] = ∫_{t_min}^{min(t1,t2)} R(t1,τ) σ²[a,b](τ;
n1, n2) R(t2,τ) dτ
which is added to the usual 2-D dblquad result.
Args:
method: ``'dblquad'`` or ``'gauss_legendre'``. ``None``
(default) uses :attr:`c_method`.
n_gauss: Override per-dim node count for the Gauss-Legendre
method. ``None`` (default) uses :attr:`n_gauss`.
"""
method = self.c_method if method is None else method
n_gauss_val = self.n_gauss if n_gauss is None else int(n_gauss)
if method == "gauss_legendre":
return _C_value_direct_gl(
self.model, n1, t1, n2, t2, n_gauss=n_gauss_val,
)
if method != "dblquad":
raise ValueError(
f"unknown c_method {method!r}; expected 'dblquad' "
f"or 'gauss_legendre'"
)
from scipy.integrate import dblquad, quad as _quad
m = self.model
N = m.n_components
t_min = m.t_min
C_mat = np.zeros((N, N))
if m.diag_C:
for a in range(N):
def integrand(lam2: float, lam1: float, _a: int = a) -> float:
r1 = float(m.R_time(t1, lam1)) if m.iso_R else float(m.R_time(t1, lam1)[_a, _a])
kappa_mat = m.kappa2(n1, lam1, n2, lam2)
r2 = float(m.R_time(t2, lam2)) if m.iso_R else float(m.R_time(t2, lam2)[_a, _a])
return r1 * float(kappa_mat[_a, _a]) * r2
val, _ = dblquad(
integrand, t_min, t1, t_min, t2, **self.quad_opts,
)
C_mat[a, a] = val
else:
for a in range(N):
for b in range(N):
def integrand(lam2: float, lam1: float, _a: int = a, _b: int = b) -> float:
if m.iso_R:
r1 = float(m.R_time(t1, lam1))
r2 = float(m.R_time(t2, lam2))
else:
r1 = float(m.R_time(t1, lam1)[_a, _a])
r2 = float(m.R_time(t2, lam2)[_b, _b])
kappa_mat = m.kappa2(n1, lam1, n2, lam2)
return r1 * float(kappa_mat[_a, _b]) * r2
val, _ = dblquad(
integrand, t_min, t1, t_min, t2, **self.quad_opts,
)
C_mat[a, b] = val
# --- White-noise (δ-correlated) component: 1-D integral ---
if m.sigma2 is not None:
t_upper = min(t1, t2)
if t_upper > t_min:
if m.diag_C:
for a in range(N):
def w_integrand(tau: float, _a: int = a) -> float:
r1 = float(m.R_time(t1, tau)) if m.iso_R \
else float(m.R_time(t1, tau)[_a, _a])
r2 = float(m.R_time(t2, tau)) if m.iso_R \
else float(m.R_time(t2, tau)[_a, _a])
sig = m.sigma2(n1, tau, n2)
return r1 * float(sig[_a, _a]) * r2
wval, _ = _quad(
w_integrand, t_min, t_upper, **self.quad_opts,
)
C_mat[a, a] += wval
else:
for a in range(N):
for b in range(N):
def w_integrand(tau: float, _a: int = a, _b: int = b) -> float:
if m.iso_R:
r1 = float(m.R_time(t1, tau))
r2 = float(m.R_time(t2, tau))
else:
r1 = float(m.R_time(t1, tau)[_a, _a])
r2 = float(m.R_time(t2, tau)[_b, _b])
sig = m.sigma2(n1, tau, n2)
return r1 * float(sig[_a, _b]) * r2
wval, _ = _quad(
w_integrand, t_min, t_upper, **self.quad_opts,
)
C_mat[a, b] += wval
return C_mat
[docs]
def C_at_batch(
self,
t1: np.ndarray,
t2: np.ndarray,
x1: np.ndarray | float,
x2: np.ndarray | float,
) -> np.ndarray:
"""Batch-evaluate C at arbitrary ``(t1, t2, x1, x2)`` arrays.
Dispatches on :attr:`homogeneity`:
- ``'translation'``: C depends only on ``|x1 − x2|``. Uses
the 3-D ``(t1, t2, r)`` full-grid spline if built via
``precompute_C_table_translation(r_max=..., n_grid_r=...)``;
otherwise uses a lazy per-r 2-D spline cache; otherwise
(legacy 2-D spline only) falls back to
:meth:`C_diagonal_batch` — x is ignored in that case.
- ``'rotation'``: C depends only on ``x1 · x2 / (|x1| |x2|)``.
Uses the 3-D ``(t1, t2, cos)`` spline or per-cos lazy cache.
- ``'general'``: 4-D ``(t1, t2, x1, x2)`` spline, or a lazy
per-pair 2-D cache.
Args:
t1, t2: Time arrays of shape ``(n,)``.
x1, x2: Spatial coordinates. Scalar or array of shape
``(n,)`` (broadcast if scalar).
# TODO(d-dim): support (n, d).
Returns:
Array of shape ``(n, N)`` — diagonal C values per
component.
"""
t1 = np.atleast_1d(np.asarray(t1))
t2 = np.atleast_1d(np.asarray(t2))
N = self.model.n_components
# Closed-form-only: skip every spline and call the user's
# analytical C function directly. Used by
# ``Propagators(c_closed_form=..., c_closed_form_only=True)``;
# gives machine-precision agreement with the analytical C
# (no interpolation error). The vectorised contract is the
# fast path; the per-sample fallback runs a Python loop and
# is only practical for small sweeps.
if self._closed_form_only:
return self._closed_form_at_batch_diag(t1, t2, x1, x2)
n = len(t1)
def _resolve_x(x):
"""Accept scalar, (n,) array, (d,) vector, or (n, d)
array. Return shape (n,) for 1-D x, (n, d) for d-D x.
"""
arr = np.asarray(x, dtype=float)
if arr.ndim == 0:
return np.full(n, float(arr))
if arr.ndim == 1:
if arr.shape[0] == n:
return arr # (n,) scalar-per-sample
# treat as (d,) vector → broadcast to (n, d)
return np.tile(arr[None, :], (n, 1))
return arr # (n, d) already
x1_b = _resolve_x(x1)
x2_b = _resolve_x(x2)
def _r_batch(a, b):
"""``|a - b|`` (scalar x) or ``‖a - b‖`` (vector x)."""
diff = a - b
if diff.ndim == 1:
return np.abs(diff)
return np.linalg.norm(diff, axis=-1)
if self.homogeneity == "translation":
# Full-grid 3-D (t1, t2, r) spline takes precedence.
if self._c_translation_splines is not None:
r = _r_batch(x1_b, x2_b)
pts = np.stack([t1, t2, r], axis=-1)
result = np.empty((n, N))
for a, itp in enumerate(self._c_translation_splines):
result[:, a] = itp(pts)
return result
if self._lazy_translation is not None:
return self._lazy_lookup(
self._lazy_translation, t1, t2, x1_b, x2_b, N,
)
# Fall back to legacy 2-D (t1, t2) spline — x is
# effectively ignored (r=0 assumed). Matches behaviour
# of users who only called the legacy
# :meth:`precompute_C_table`.
return self.C_diagonal_batch(t1, t2)
if self.homogeneity == "rotation":
if self._c_rotation_splines is not None:
# cos per sample
cosv = np.empty(n)
for k in range(n):
cosv[k] = _rotation_cos(x1_b[k], x2_b[k])
pts = np.stack([t1, t2, cosv], axis=-1)
result = np.empty((n, N))
for a, itp in enumerate(self._c_rotation_splines):
result[:, a] = itp(pts)
return result
if self._lazy_rotation is not None:
return self._lazy_lookup(
self._lazy_rotation, t1, t2, x1_b, x2_b, N,
)
raise RuntimeError(
"homogeneity='rotation' but no rotation table has "
"been built — call precompute_C_table_rotation() first"
)
# homogeneity == "general"
if self._c_general_interpolators is not None:
# Full-grid general mode is built over a 1-D x axis
# (precompute_C_table_general uses ``np.linspace(-x_max,
# x_max, n_grid_x)``). Extending the grid to d dimensions
# would yield a (2 + 2d)-D spline whose build cost scales
# as ``n_grid_x ** (2d)`` -- exponentially expensive even
# for d=2. Reject vector inputs in full-grid mode and
# direct the user to lazy mode, which already supports
# d-dim via dict-keyed memoisation.
if x1_b.ndim > 1 or x2_b.ndim > 1:
raise NotImplementedError(
"general-mode full-grid C tables only support "
"scalar (1-D) spatial coordinates because the "
"spline grid scales as n_grid_x**(2d). Use lazy "
"mode (omit x_max / n_grid_x in "
"precompute_C_table_general) for d-dim inputs."
)
pts = np.stack([t1, t2, x1_b, x2_b], axis=-1)
result = np.empty((n, N))
for a, itp in enumerate(self._c_general_interpolators):
result[:, a] = itp(pts)
return result
if self._lazy_general is not None:
return self._lazy_lookup(
self._lazy_general, t1, t2, x1_b, x2_b, N,
)
raise RuntimeError(
"homogeneity='general' but no general table has been "
"built — call precompute_C_table_general() first"
)
def _closed_form_at_batch_diag(
self,
t1: np.ndarray,
t2: np.ndarray,
x1,
x2,
) -> np.ndarray:
"""Direct lookup for ``closed_form_only=True``: skips every
spline, calls ``self._C_value_direct`` (the user's c_fn), and
returns either per-sample diagonals or full matrices.
When ``model.diag_C`` is true the return shape is ``(n, N)``.
Otherwise the full-matrix return shape is ``(n, N, N)``.
Two contracts:
* **Vectorised** (``self._closed_form_vectorized = True``):
single call ``c_fn(x1, t1, x2, t2) -> (n, N, N)``. Recommended
for sweep performance.
* **Per-sample** (default): Python loop calling
``c_fn(x1_i, t1_i, x2_i, t2_i) -> (N, N)`` per sample.
Always correct, but ~50-100x slower than the vectorised
path on typical workloads -- only practical for small
point evaluations or testing.
"""
N = self.model.n_components
n = len(t1)
# Resolve x1 / x2 to per-sample arrays so the loop body can
# always slice ``x1_b[i]``. Accepts scalar, (n,), or (n, d).
def _broadcast_x(x):
arr = np.asarray(x, dtype=float)
if arr.ndim == 0:
return np.full(n, float(arr))
if arr.ndim == 1 and arr.shape[0] == n:
return arr
if arr.ndim == 1:
# (d,) vector → broadcast to (n, d)
return np.tile(arr[None, :], (n, 1))
return arr # (n, d) already
x1_b = _broadcast_x(x1)
x2_b = _broadcast_x(x2)
if self._closed_form_vectorized:
full = np.asarray(self._C_value_direct(x1_b, t1, x2_b, t2))
if full.ndim != 3 or full.shape[0] != n:
raise ValueError(
f"vectorised closed-form c_fn must return shape "
f"(n, N, N); got {full.shape} for n={n}, N={N}."
)
if not self.model.diag_C:
return full.astype(float, copy=False)
# Diagonal per sample: result[i, a] = full[i, a, a].
return np.einsum("iaa->ia", full).astype(float, copy=False)
# Per-sample fallback. Slow but always correct.
if self.model.diag_C:
result = np.empty((n, N), dtype=float)
for i in range(n):
xi = x1_b[i]
xj = x2_b[i]
C_mat = np.asarray(self._C_value_direct(xi, t1[i], xj, t2[i]))
result[i] = np.diag(C_mat)
return result
result_full = np.empty((n, N, N), dtype=float)
for i in range(n):
xi = x1_b[i]
xj = x2_b[i]
result_full[i] = np.asarray(self._C_value_direct(xi, t1[i], xj, t2[i]))
return result_full
def _lazy_lookup(
self,
lazy: "_LazyTimeSplineCache",
t1: np.ndarray,
t2: np.ndarray,
x1: np.ndarray,
x2: np.ndarray,
N: int,
) -> np.ndarray:
"""Look up C via per-parameter 2-D lazy splines, grouping
samples by memoization key so each distinct parameter value
triggers at most one spline build."""
n = len(t1)
result = np.empty((n, N))
keys = [lazy._make_key(x1[k], x2[k]) for k in range(n)]
seen: dict = {}
for k, key in enumerate(keys):
seen.setdefault(key, []).append(k)
for key, idxs in seen.items():
rep_k = idxs[0]
splines = lazy.get_splines(x1[rep_k], x2[rep_k])
sel = np.array(idxs)
t1_sel = t1[sel]
t2_sel = t2[sel]
for a in range(N):
result[sel, a] = splines[a](t1_sel, t2_sel, grid=False)
return result
# ---------------------------------------------------------------------------
# Step 4: Contraction & Integration
# ---------------------------------------------------------------------------
[docs]
@dataclass(frozen=True)
class DynamicCouplingPromise:
"""Defers coupling-tensor materialisation to QMC-sample time.
When any entry in ``coupling_values`` is a callable (e.g. a
spacetime-dependent non-local vertex such as demo2's
``κ^{(3)}(x₁,t₁; x₂,t₂; x₃,t₃)``), the usual pre-QMC
:meth:`DiagramTerm.evaluate_coupling` path doesn't apply —
the callable's output changes with each sample's QMC time
coordinates. This class packages everything needed for the
per-sample path:
- the list of static (``ndarray``) symbol values,
- the list of dynamic (callable) symbol values,
- for each dynamic symbol, the ψ-leg spatial-label tuple
extracted from the :class:`DiagramTerm`'s ``coupling_sum`` so
we can look up each leg's time and position per sample.
The per-sample evaluator :meth:`evaluate_at` materialises the
dynamic tensors using the sample's ``(times, positions)`` and
then delegates to the static-path
:meth:`DiagramTerm.evaluate_coupling` with a fully-numeric
``coupling_values`` dict — so the contraction code stays the
same.
"""
#: The parent :class:`DiagramTerm`.
diagram_term: Any
#: Static coupling values — already materialised arrays.
static_values: dict
#: Dynamic coupling values — mapping ``name -> callable(n_list,
#: t_list)`` returning an ``ndarray``.
dynamic_values: dict
#: Per-dynamic-symbol tuple of ψ-leg spatial labels, as they
#: appear in the diagram's ``coupling_sum``.
spatial_args_by_name: dict
#: Component-index pins forwarded from ``build_integrand``.
fixed_indices: dict
[docs]
def evaluate_at(
self,
times: dict,
positions: dict,
) -> np.ndarray:
"""Materialise the dynamic callables at this sample's
``(times, positions)`` and return the per-sample
``coupling_array``.
Args:
times: ``{spatial_label: time_value}`` for all spatial
labels referenced by any dynamic symbol's legs.
positions: ``{spatial_label: position_value}`` same.
"""
sample_cv = dict(self.static_values)
for name, fn in self.dynamic_values.items():
legs = self.spatial_args_by_name[name]
t_list = np.array([times[s] for s in legs], dtype=float)
n_list = np.array([positions[s] for s in legs], dtype=float)
sample_cv[name] = np.asarray(fn(n_list, t_list))
return self.diagram_term.evaluate_coupling(
sample_cv, self.fixed_indices,
)
[docs]
def evaluate_at_batch(
self,
label_t: dict,
label_x: dict,
n_samples: int,
) -> np.ndarray:
"""Vectorised batch evaluator -- returns a ``(n_samples,)``
complex array of the contracted scalar coupling per sample.
``label_t`` and ``label_x`` map every spatial label appearing
in any dynamic symbol's legs to a ``(n_samples,)`` time
array / position respectively. Each ``label_x`` entry may be:
* a scalar (the historical / 1-D translation case) — produces
a per-leg broadcast of shape ``(n_samples,)``;
* a ``(d,)`` vector (e.g. a 3-D unit vector under
``homogeneity='rotation'``) — produces a per-leg broadcast
of shape ``(n_samples, d)``.
The user callable then sees per-leg slices of shape ``(m,)`` or
``(m, d)`` respectively (or, in the vectorised contract below,
``(m, n_samples)`` / ``(m, n_samples, d)``). All legs of one
symbol must have the same position shape; mixing scalar and
vector legs raises from ``np.stack``.
Per-symbol fast path: when the user marks a callable
``vectorized=True`` (e.g. via
:class:`~sft_wick.workflow.specs.NonLocalVertex(coupling_vectorized=True)`
or by setting ``fn.vectorized = True``), the wrapped
callable receives ``(m_legs, n_samples)`` arrays — or
``(m_legs, n_samples, d)`` for d-dim positions — in a single
call and returns a tensor of shape
``(n_samples,) + (N,)*order``. Otherwise we fall back to the
``n_samples`` per-sample calls of :meth:`evaluate_at`.
Either way, we still loop in Python over ``n_samples`` to
contract each sample's tensor down to a scalar via
:meth:`DiagramTerm.evaluate_coupling`. Vectorising the
contraction itself across a sample axis is the deferred
prop-indexed-dynamic refactor and is not done here.
Raises ``NotImplementedError`` if any sample's contracted
coupling is not scalar (the prop-indexed case locked by
``DC1`` in ``tests/test_dynamic_coupling.py``).
"""
# Pre-build (m, n_samples) leg arrays once per symbol, so the
# per-sample loop only does scalar slicing.
per_symbol_legs: dict = {}
for name in self.dynamic_values:
legs = self.spatial_args_by_name[name]
t_2d = np.stack(
[np.asarray(label_t[s], dtype=float) for s in legs],
axis=0,
) # shape (m, n_samples)
leg_x_arrs = []
for s in legs:
x_val = np.asarray(label_x[s], dtype=float)
# Positions per leg are *fixed* across samples (they
# are the integration's external coordinates), so we
# broadcast a leading sample axis of length n_samples.
#
# Two regimes:
# * scalar position (x_val.ndim == 0): broadcast to
# (n_samples,) -- the historical case; produces a
# final n_arr of shape (m, n_samples) and a per-
# sample slice of shape (m,) for the user callable.
# * d-dim vector position (x_val.ndim >= 1): broadcast
# to (n_samples, *x_val.shape) -- produces a final
# n_arr of shape (m, n_samples, *vec_shape) and a
# per-sample slice of shape (m, *vec_shape) for the
# user callable. The user callable is responsible
# for handling its own per-leg position shape.
if x_val.ndim == 0:
leg_x_arrs.append(np.full((n_samples,), float(x_val)))
else:
leg_x_arrs.append(
np.broadcast_to(
x_val[None, ...], (n_samples,) + x_val.shape
)
)
# Mixing scalar and vector legs (or different vector dims
# across legs) makes ``np.stack`` raise -- a clear shape
# error that is more useful than a silent broadcast.
n_arr = np.stack(leg_x_arrs, axis=0)
# scalar legs -> (m, n_samples)
# d-dim legs -> (m, n_samples, *vec_shape)
per_symbol_legs[name] = (n_arr, t_2d)
# Vectorised symbols: single fn call yields per-sample tensor
# stack of shape (n_samples, *kappa_shape). For the rest, we
# fall back to the per-sample call inside the loop.
per_sample_tensors: dict = {}
for name, fn in self.dynamic_values.items():
if getattr(fn, "vectorized", False):
n_arr, t_2d = per_symbol_legs[name]
stacked = np.asarray(fn(n_arr, t_2d))
if stacked.shape[0] != n_samples:
raise ValueError(
f"vectorized callable {name!r} returned shape "
f"{stacked.shape}; expected leading axis of "
f"length n_samples={n_samples}."
)
per_sample_tensors[name] = stacked
# Probe shape on sample 0 for an early NotImplementedError on
# prop-indexed dynamic. Mirrors the per-sample loop in
# ``integrate_moment_qmc_vectorized`` -- callers rely on this
# raise to signal "use a static coupling tensor instead".
sample_cv0 = dict(self.static_values)
for name, fn in self.dynamic_values.items():
if name in per_sample_tensors:
sample_cv0[name] = per_sample_tensors[name][0]
else:
n_arr, t_2d = per_symbol_legs[name]
sample_cv0[name] = np.asarray(fn(n_arr[:, 0], t_2d[:, 0]))
coup0 = self.diagram_term.evaluate_coupling(
sample_cv0, self.fixed_indices,
)
if coup0.ndim != 0:
raise NotImplementedError(
"Dynamic coupling with propagator-indexed "
"contraction is not yet supported. Either "
"use a spacetime-constant coupling tensor, or "
"provide a fully contracted scalar callable."
)
# ------------------------------------------------------------
# Vectorised fast path (default).
# ------------------------------------------------------------
# Materialise every dynamic symbol's per-sample tensor stack as
# a single ``(n_samples, *kappa_shape)`` array, then call
# ``DiagramTerm.evaluate_coupling_batched`` once -- replacing
# the inner ``n_samples`` calls to ``_eval_symbolic`` with one
# vectorised pass.
#
# If the symbolic ``coupling_sum`` contains a node type the
# batched evaluator cannot handle (e.g. a ``Propagator`` or
# ``IntegralOver`` slipped into a coupling expression), the
# batched path raises ``NotImplementedError`` and we fall
# back to the original per-sample loop below. This mirrors
# the safety net documented in
# :func:`sft_wick.perturbation._eval_symbolic_batched`.
try:
batched_cv: dict = dict(self.static_values)
for name, fn in self.dynamic_values.items():
if name in per_sample_tensors:
batched_cv[name] = per_sample_tensors[name]
else:
n_arr, t_2d = per_symbol_legs[name]
# Per-sample callable: build the (n_samples, ...)
# stack ourselves so the contraction is then
# a single ufunc pass. This still pays one
# callable invocation per sample (unavoidable
# without ``vectorized=True``), but the symbolic
# contraction cost is amortised away.
sample0 = np.asarray(fn(n_arr[:, 0], t_2d[:, 0]))
stack = np.empty(
(n_samples,) + sample0.shape,
dtype=sample0.dtype,
)
stack[0] = sample0
for s in range(1, n_samples):
stack[s] = np.asarray(
fn(n_arr[:, s], t_2d[:, s])
)
batched_cv[name] = stack
couplings = self.diagram_term.evaluate_coupling_batched(
batched_cv,
n_samples=n_samples,
fixed_indices=self.fixed_indices,
)
# ``evaluate_coupling_batched`` returns shape
# ``(n_samples,) + prop_shape``; for the dynamic path we
# already verified ``coup0.ndim == 0`` above so the
# output is ``(n_samples,)``.
if couplings.ndim != 1 or couplings.shape[0] != n_samples:
raise NotImplementedError(
"evaluate_coupling_batched returned non-scalar "
"per-sample output; falling back to scalar loop."
)
return np.ascontiguousarray(couplings).astype(complex, copy=False)
except NotImplementedError:
pass # fall through to scalar per-sample loop below
couplings = np.empty(n_samples, dtype=complex)
couplings[0] = complex(coup0)
for s in range(1, n_samples):
sample_cv = dict(self.static_values)
for name, fn in self.dynamic_values.items():
if name in per_sample_tensors:
sample_cv[name] = per_sample_tensors[name][s]
else:
n_arr, t_2d = per_symbol_legs[name]
sample_cv[name] = np.asarray(fn(n_arr[:, s], t_2d[:, s]))
couplings[s] = complex(
self.diagram_term.evaluate_coupling(
sample_cv, self.fixed_indices,
)
)
return couplings
[docs]
@dataclass(frozen=True)
class DiagramIntegrand:
"""A diagram's contribution decomposed for numerical integration.
Combines coupling coefficients (Step 1) with spatial analysis (Step 2)
for efficient numerical evaluation.
The integrand has the factored form (when ``iso_R=True``)::
R_product(times) × Σ_{prop_idx} coeff[idx] × Π_C C_values[idx]
"""
#: The underlying DiagramTerm.
diagram_term: "DiagramTerm"
#: Spatial analysis result.
spatial: SpatialStructure
#: Coupling coefficient array from ``evaluate_coupling()``.
#: When :attr:`dynamic_coupling` is set, this is a placeholder
#: zero array and the real coupling value is computed per-sample
#: in the integrator.
coupling_array: np.ndarray
#: Fixed component indices (e.g. ``{'a': 0, 'b': 1}``).
#: Used by the QMC/GL integrators to resolve C-propagator
#: component indices that are not summation variables.
fixed_indices: dict[str, int] = field(default_factory=dict)
#: Optional :class:`DynamicCouplingPromise` carrying per-sample
#: coupling materialisation logic for spacetime-dependent
#: (callable) coupling values. ``None`` ⇒ the fully-static
#: fast path is used (all coupling values were ndarrays).
dynamic_coupling: Any = None
[docs]
def evaluate(
self,
times: dict[str, float],
directions: dict[str, Any],
cache: PropagatorCache,
) -> complex:
"""Evaluate the integrand at specific time + direction coordinates.
Args:
times: ``{spatial_point: time_value}`` for ALL spatial points.
Aliased (non-representative) legs of equal-time non-local
vertices may be absent --- their values are filled in from
the canonical representatives inside this method.
directions: ``{direction_var: value}`` for independent direction
variables (one per R-connected component, keyed by
the representative name from ``spatial.direction_map``).
cache: A :class:`PropagatorCache` for propagator evaluation.
Returns:
Scalar value of the integrand (complex if coupling is complex).
"""
dt = self.diagram_term
spatial = self.spatial
coeff = self.coupling_array
model = cache.model
# Expand the times dict to fill in aliased legs of any
# equal_time non-local vertex; downstream propagator lookups
# (``cache.R_product``, ``times[sp_l]`` etc.) then work without
# special-case branches.
if spatial.equal_time_aliases:
times = dict(times)
for non_rep, rep in spatial.equal_time_aliases:
if non_rep not in times and rep in times:
times[non_rep] = times[rep]
# --- R product (scalar when iso_R) ---
#
# ``PropagatorCache.R_product`` is intentionally scalar-only.
# For matrix-valued R, resolve component indices alongside the
# C contraction below so order-0 R diagrams do not try to cast an
# (N, N) matrix to float.
r_val = (
cache.R_product(_kept_r_propagators(spatial), times)
if model.iso_R else 1.0
)
# --- Evaluate C propagators ---
prop_idx = dt.propagator_indices
if not prop_idx:
# No propagator indices → scalar coupling, evaluate C without
# summation, but still honour the integrand's
# ``fixed_indices`` (observable component labels like
# ``a``, ``b``) — without this the C matrix falls through
# to ``C_mat.trace()`` and picks up a spurious factor of
# N at order 0 for ``⟨φ_a(x) φ_b(y)⟩``-style observables.
c_val = 1.0
for sp_l, sp_r, il, ir in spatial.c_propagators:
dir_l = spatial.direction_map[sp_l]
dir_r = spatial.direction_map[sp_r]
n_l = directions.get(dir_l, directions.get(sp_l))
n_r = directions.get(dir_r, directions.get(sp_r))
C_mat = cache.C_value(n_l, times[sp_l], n_r, times[sp_r])
a = self._resolve_component(il, self.fixed_indices)
b = self._resolve_component(ir, self.fixed_indices)
if a is not None and b is not None:
c_val *= C_mat[a, b]
else:
c_val *= C_mat.trace()
if not model.iso_R:
c_val *= self._evaluate_r_product_general(
times, cache, self.fixed_indices,
)
return complex(r_val * complex(coeff) * c_val)
# --- Map each C propagator to its propagator-index axis ---
idx_names = [name for name, _ in prop_idx]
idx_name_to_axis = {name: ax for ax, name in enumerate(idx_names)}
if model.iso_R and model.diag_C:
return self._evaluate_diag_fast(
r_val, times, directions, cache, idx_names, idx_name_to_axis
)
else:
return self._evaluate_general(
r_val, times, directions, cache, idx_names, idx_name_to_axis
)
def _evaluate_diag_fast(
self,
r_val: float,
times: dict[str, float],
directions: dict[str, Any],
cache: PropagatorCache,
idx_names: list[str],
idx_name_to_axis: dict[str, int],
) -> complex:
"""Fast path for iso_R + diag_C case.
Each C propagator contributes a diagonal vector; contraction is
element-wise multiplication then summation.
"""
spatial = self.spatial
coeff = self.coupling_array
n_axes = coeff.ndim
contracted = coeff.copy()
for sp_l, sp_r, il, ir in spatial.c_propagators:
dir_l = spatial.direction_map[sp_l]
dir_r = spatial.direction_map[sp_r]
n_l = directions.get(dir_l, directions.get(sp_l))
n_r = directions.get(dir_r, directions.get(sp_r))
c_diag = cache.C_diagonal(n_l, times[sp_l], n_r, times[sp_r])
# Diagonal: il == ir (after apply_diagonal). Find the axis.
idx_name = il # = ir for diagonal propagators
if idx_name is None:
# Isotropic C (iso_C): no index → scalar trace
contracted = contracted * c_diag.sum()
continue
axis = idx_name_to_axis.get(idx_name)
if axis is None:
# Literal index (e.g. '1') not in summation — resolve directly
a = DiagramIntegrand._resolve_component(idx_name, {})
if a is not None:
contracted = contracted * c_diag[a]
else:
contracted = contracted * c_diag.sum()
continue
# Broadcast c_diag along the correct axis
shape = [1] * n_axes
shape[axis] = len(c_diag)
contracted = contracted * c_diag.reshape(shape)
total = contracted.sum()
return complex(r_val * total)
def _evaluate_general(
self,
r_val: float,
times: dict[str, float],
directions: dict[str, Any],
cache: PropagatorCache,
idx_names: list[str],
idx_name_to_axis: dict[str, int],
) -> complex:
"""General path: explicit loop over propagator index combinations."""
spatial = self.spatial
coeff = self.coupling_array
dt = self.diagram_term
prop_idx = dt.propagator_indices
prop_shape = tuple(dim for _, dim in prop_idx)
total = complex(0)
for pidx in np.ndindex(*prop_shape):
c_val = complex(coeff[pidx])
if c_val == 0:
continue
# Merge the integrand's fixed component indices (e.g.
# observable labels like ``a``, ``b``) into the
# per-iteration summation ``idx_map``, matching the
# vectorised path — without this merge, propagator legs
# that reference observable labels fall through to
# ``C_mat.trace()`` (spurious factor of N).
idx_map = {
**self.fixed_indices,
**{name: val for name, val in zip(idx_names, pidx)},
}
for sp_l, sp_r, il, ir in spatial.c_propagators:
dir_l = spatial.direction_map[sp_l]
dir_r = spatial.direction_map[sp_r]
n_l = directions.get(dir_l, directions.get(sp_l))
n_r = directions.get(dir_r, directions.get(sp_r))
C_mat = cache.C_value(n_l, times[sp_l], n_r, times[sp_r])
a = self._resolve_component(il, idx_map)
b = self._resolve_component(ir, idx_map)
if a is not None and b is not None:
c_val *= C_mat[a, b]
else:
c_val *= C_mat.trace()
# For non-iso R: include R matrix elements
if not cache.model.iso_R:
c_val *= self._evaluate_r_product_general(times, cache, idx_map)
total += c_val
return r_val * total
def _evaluate_r_product_general(
self,
times: dict[str, float],
cache: PropagatorCache,
idx_map: dict[str, int],
) -> complex:
"""Evaluate matrix-valued R propagators for one component assignment."""
result: complex = 1.0
for sl, sr in _kept_r_propagators(self.spatial):
R_mat = np.asarray(cache.R_time(times[sl], times[sr]))
r_prop = self._find_r_propagator(sl, sr)
if r_prop and r_prop.index_left and r_prop.index_right:
a = self._resolve_component(r_prop.index_left, idx_map)
b = self._resolve_component(r_prop.index_right, idx_map)
if a is not None and b is not None:
result *= complex(R_mat[a, b])
else:
result *= complex(np.trace(R_mat))
else:
result *= complex(np.trace(R_mat))
return result
def _find_r_propagator(self, sl: str, sr: str) -> Propagator | None:
"""Find the R propagator matching spatial_left=sl, spatial_right=sr."""
for p in self.diagram_term.propagators:
if p.kind == "R" and p.spatial_left == sl and p.spatial_right == sr:
return p
return None
@staticmethod
def _resolve_component(
idx_name: str | None, idx_map: dict[str, int]
) -> int | None:
"""Resolve a component index name to a 0-indexed integer."""
if idx_name is None:
return None
if idx_name in idx_map:
return idx_map[idx_name]
try:
return int(idx_name) - 1 # 1-indexed literal
except ValueError:
return None
@staticmethod
def _resolve_group_x(
spatial: "SpatialStructure",
positions: dict[str, Any] | None,
default: Any,
) -> dict[str, Any]:
"""Map each direction group (by its direction-variable name) to
a single spatial coordinate.
Priority for each group:
1. If the group contains an external point whose position the
caller specified in ``positions``, use that.
2. Otherwise fall back to ``default`` (typically 0 for
integration-only groups, or whatever ``direction`` kwarg
supplies for backward compat).
This is what flows into ``cache.C_at_batch`` as the endpoint
x-value when the cache has a spatial table built (of any
homogeneity kind).
``positions`` values may be scalars (1-D / legacy) or
arbitrary-dimensional vectors. The ``default`` likewise may
be a scalar or a vector. The returned dict preserves whatever
shape the user passed -- downstream
``PropagatorCache.C_at_batch`` and the spatial Kappa2
wrappers (``_SeparableTranslationKappa2``, ``_rotation_cos``,
...) all accept either form.
"""
positions = positions or {}
group_x: dict[str, Any] = {}
for group in spatial.direction_groups:
dvar_sample = spatial.direction_map[next(iter(group))]
x_val: Any = default
for p in group:
if p in positions:
x_val = positions[p]
break
group_x[dvar_sample] = x_val
return group_x
def _evaluate_zero_dimensional(
self,
lambda_f: float,
cache: PropagatorCache,
*,
direction: Any = 0,
positions: dict[str, Any] | None = None,
) -> float:
"""Evaluate an integrand with no surviving time variables.
Dynamic non-local couplings still need to be materialised here. This
occurs for already-R-contracted vertices whose absorbed R legs alias
directly onto fixed external points.
"""
spatial = self.spatial
if _cache_has_spatial_table(cache):
directions = self._resolve_group_x(spatial, positions, direction)
else:
dir_vars = set(spatial.direction_map.values())
directions = {d: direction for d in dir_vars}
fixed_times = {v: lambda_f for v in spatial.external_points}
if self.dynamic_coupling is None:
val = self.evaluate(fixed_times, directions, cache)
return float(val.real if val.imag == 0 else abs(val))
n_samples = 1
fixed_t = np.full(n_samples, lambda_f)
et_alias = dict(spatial.equal_time_aliases or ())
def _times(var: str) -> np.ndarray:
_ = et_alias.get(var, var)
return fixed_t
r_product = np.ones(n_samples)
for sl, sr in _kept_r_propagators(spatial):
r_product *= cache.R_time_batch(_times(sl), _times(sr))
spatial_aware = _cache_has_spatial_table(cache)
group_x: dict[str, Any] | None = None
if spatial_aware:
group_x = self._resolve_group_x(spatial, positions, direction)
def _lookup_C(sp_l, sp_r, t_l, t_r):
if not spatial_aware:
return cache.C_diagonal_batch(t_l, t_r)
x_l = group_x[spatial.direction_map[sp_l]] # type: ignore[index]
x_r = group_x[spatial.direction_map[sp_r]] # type: ignore[index]
return cache.C_at_batch(t_l, t_r, x_l, x_r)
fi = self.fixed_indices
c_product = np.ones(n_samples)
for sp_l, sp_r, il, ir in spatial.c_propagators:
C_batch = _lookup_C(sp_l, sp_r, _times(sp_l), _times(sp_r))
a_i = (
DiagramIntegrand._resolve_component(il, fi)
if il is not None else None
)
b_i = (
DiagramIntegrand._resolve_component(ir, fi)
if ir is not None else None
)
c_product *= _select_C_batch(C_batch, a_i, b_i)
all_spatial_labels = set(spatial.direction_map.keys())
label_t = {lab: _times(lab) for lab in all_spatial_labels}
if spatial_aware:
label_x = {
lab: group_x[spatial.direction_map[lab]] # type: ignore[index]
for lab in all_spatial_labels
}
else:
label_x = {
lab: float(direction)
for lab in all_spatial_labels
}
couplings = self.dynamic_coupling.evaluate_at_batch(
label_t=label_t,
label_x=label_x,
n_samples=n_samples,
)
values = couplings.real * r_product * c_product
return float(values[0])
[docs]
def make_scipy_integrand(
self,
external_times: dict[str, float],
external_directions: dict[str, Any],
cache: PropagatorCache,
) -> Callable:
"""Return a callable for ``scipy.integrate.nquad``.
The returned function accepts the integration time variables as
positional arguments in the order of
``spatial.time_integration_vars`` and returns the integrand value.
External times and directions are baked in.
Args:
external_times: ``{point_name: time}`` for external spatial points.
external_directions: ``{direction_var: value}`` for all independent
direction variables.
cache: A :class:`PropagatorCache`.
Returns:
Callable ``f(*time_args) → float``.
"""
spatial = self.spatial
int_vars = spatial.time_integration_vars
def integrand(*time_args: float) -> float:
times = dict(external_times)
for var, val in zip(int_vars, time_args):
times[var] = val
result = self.evaluate(times, external_directions, cache)
return result.real if result.imag == 0 else abs(result)
return integrand
[docs]
def integration_bounds(
self,
external_times: dict[str, float],
t_min: float = 0.0,
) -> list:
"""Return integration bounds for ``scipy.integrate.nquad``.
Time ordering constraints from R causality translate to
variable-dependent bounds. For ``nquad``, bounds can be
callables ``f(*earlier_args) → (lo, hi)``.
The variables are ordered as ``spatial.time_integration_vars``.
Returns:
List of bounds, one per integration variable. Each is either
``(lo, hi)`` or a callable for dependent bounds.
"""
spatial = self.spatial
int_vars = list(spatial.time_integration_vars)
# Build map: for each integration var, what is its upper bound?
# Upper bound = min(t_later) where t_later > t_var in causal order.
# If t_later is external, it's a constant; if integration, it's a var.
upper_bounds: dict[str, list[str]] = defaultdict(list)
for earlier, later in spatial.time_orderings:
if earlier in int_vars:
upper_bounds[earlier].append(later)
# Build bounds list
bounds: list = []
for i, var in enumerate(int_vars):
ub_sources = upper_bounds.get(var, [])
if not ub_sources:
# No causal constraint — integrate from t_min to some max
max_t = max(external_times.values()) if external_times else 1.0
bounds.append((t_min, max_t))
else:
# Upper bound is min of all constraining times
# For nquad, bounds[i] can be a function of the *previous* args
# nquad calls bounds[i](*args[:i]) — but our vars may be in
# different positions. We need to handle this carefully.
#
# nquad convention: f(x0, x1, ..., x_{n-1}) where x0 is innermost.
# bounds[i] is called with (x0, ..., x_{i-1}).
# Our int_vars[0] is innermost (earliest time).
_var = var
_ub = ub_sources
_ext = external_times
_int_vars = int_vars
_t_min = t_min
def make_bound(
ub: list[str],
ext: dict[str, float],
ivars: list[str],
lo: float,
) -> Callable:
def bound_func(*prev_args: float) -> tuple[float, float]:
# prev_args correspond to int_vars[:current_index]
hi_vals: list[float] = []
for src in ub:
if src in ext:
hi_vals.append(ext[src])
else:
# Find position of src in int_vars
src_idx = ivars.index(src)
if src_idx < len(prev_args):
hi_vals.append(prev_args[src_idx])
else:
hi_vals.append(ext.get(src, 1.0))
hi = min(hi_vals) if hi_vals else 1.0
return (lo, hi)
return bound_func
bounds.append(make_bound(_ub, _ext, _int_vars, _t_min))
return bounds
[docs]
def to_latex(self) -> str:
r"""Render the decomposed integrand with explicit coordinates.
Shows direction identifications, time-ordered integrals,
factored R product, and the component sum over C propagators.
"""
from .expressions import _latex_index
spatial = self.spatial
dt = self.diagram_term
parts: list[str] = []
# --- Direction identification note ---
for group in spatial.direction_groups:
if len(group) > 1:
rep = None
for p in sorted(group):
if p in spatial.external_points:
rep = p
break
if rep is None:
rep = sorted(group)[0]
others = sorted(group - {rep})
if others:
parts.append(
rf"\text{{[all directions}} = \hat{{n}}_{{{_latex_index(rep)}}}]"
)
# --- Time-ordered integrals ---
body_parts: list[str] = []
# R product
r_strs = []
for sl, sr in spatial.r_propagators:
r_strs.append(rf"R(t_{{{_latex_index(sl)}}}, t_{{{_latex_index(sr)}}})")
if r_strs:
body_parts.append(" ".join(r_strs))
# Summation + C factors
prop_idx = dt.propagator_indices
sum_prefix = ""
for name, dim in prop_idx:
sum_prefix += rf"\sum_{{{_latex_index(name)}=1}}^{{{dim}}} "
c_strs = []
for sp_l, sp_r, il, ir in spatial.c_propagators:
dir_l = spatial.direction_map[sp_l]
dir_r = spatial.direction_map[sp_r]
idx_str = ""
if il is not None and ir is not None:
idx_str = f"_{{{_latex_index(il)}{_latex_index(ir)}}}"
c_strs.append(
rf"C{idx_str}({dir_l}, t_{{{_latex_index(sp_l)}}};\, "
rf"{dir_r}, t_{{{_latex_index(sp_r)}}})"
)
coeff_str = r"\mathrm{coeff}"
if prop_idx:
idx_sub = ",".join(_latex_index(n) for n, _ in prop_idx)
coeff_str = rf"\mathrm{{coeff}}_{{{idx_sub}}}"
inner = rf"{sum_prefix}{coeff_str} " + " ".join(c_strs)
body_parts.append(inner)
body = r" \cdot ".join(body_parts)
# Wrap with time integrals
for var in reversed(list(spatial.time_integration_vars)):
body = rf"\int \mathrm{{d}}t_{{{_latex_index(var)}}}\, {body}"
result = body
if parts:
result = " ".join(parts) + r" \quad " + body
return result
[docs]
def integrate_moment_qmc(
self,
lambda_f: float,
cache: PropagatorCache,
t_min: float = 0.0,
direction: Any = 0,
n_samples: int = 2**14,
seed: int | None = None,
positions: dict[str, float] | None = None,
integrate_over: Any = None,
) -> tuple[float, float]:
"""Integrate over all time variables using Quasi-Monte Carlo.
Scalar-loop sibling of
:meth:`integrate_moment_qmc_vectorized`; see that method's
docstring for the ``integrate_over`` kwarg semantics.
"""
from scipy.stats import qmc
if self.dynamic_coupling is not None:
raise NotImplementedError(
"method='qmc_scalar' does not support spacetime-dependent "
"(callable) couplings. The scalar loop evaluates the static "
"coupling_array placeholder; use method='qmc_vectorized' or "
"method='gauss_legendre' instead."
)
spatial = self.spatial
int_vars_parents_first = list(reversed(spatial.time_integration_vars))
ext_vars = list(spatial.external_points)
ext_int_set = _resolve_integrate_over(integrate_over, ext_vars)
ext_integrated = [v for v in ext_vars if v in ext_int_set]
ext_fixed = [v for v in ext_vars if v not in ext_int_set]
sobol_vars = ext_integrated + int_vars_parents_first
n_ext_int = len(ext_integrated)
n_total = len(sobol_vars)
if _cache_has_spatial_table(cache):
directions = self._resolve_group_x(spatial, positions, direction)
else:
dir_vars = set(spatial.direction_map.values())
directions = {d: direction for d in dir_vars}
if n_total == 0:
fixed_times = {v: lambda_f for v in ext_fixed}
val = self.evaluate(fixed_times, directions, cache)
return (val.real, 0.0)
parent_map: dict[str, list[str]] = defaultdict(list)
for earlier, later in spatial.time_orderings:
if earlier in int_vars_parents_first:
parent_map[earlier].append(later)
# Generate Sobol samples in [0,1]^d
sampler = qmc.Sobol(d=n_total, seed=seed)
u_samples = sampler.random(n_samples) # (n_samples, n_total)
# Evaluate integrand at each sample point
values = np.empty(n_samples)
span = lambda_f - t_min
for s in range(n_samples):
u = u_samples[s]
times: dict[str, float] = {v: lambda_f for v in ext_fixed}
jacobian = 1.0
# Integrated externals: free in [t_min, lambda_f]
for k in range(n_ext_int):
t = t_min + u[k] * span
times[ext_integrated[k]] = t
jacobian *= span
# Internal vars: bounded by causal parents
for k, var in enumerate(int_vars_parents_first):
idx = n_ext_int + k
parents = parent_map.get(var, [])
if parents:
hi = min(times[p] for p in parents if p in times)
else:
hi = lambda_f
lo = t_min
width = hi - lo
if width <= 0:
jacobian = 0.0
times[var] = lo
else:
times[var] = lo + u[idx] * width
jacobian *= width
if jacobian == 0.0:
values[s] = 0.0
continue
result = self.evaluate(times, directions, cache)
val = result.real if result.imag == 0 else abs(result)
values[s] = val * jacobian
estimate = float(np.mean(values))
# Standard error from 8 batched sub-means
n_batches = min(8, n_samples)
batch_size = n_samples // n_batches
batch_means = np.array([
np.mean(values[i * batch_size:(i + 1) * batch_size])
for i in range(n_batches)
])
std_error = float(np.std(batch_means, ddof=1) / np.sqrt(n_batches))
return (estimate, std_error)
[docs]
def integrate_moment_qmc_vectorized(
self,
lambda_f: float,
cache: PropagatorCache,
t_min: float = 0.0,
direction: Any = 0,
n_samples: int = 2**14,
seed: int | None = None,
positions: dict[str, float] | None = None,
integrate_over: Any = None,
) -> tuple[float, float]:
"""Vectorized QMC integration — no Python loop over samples.
Same algorithm as :meth:`integrate_moment_qmc` but evaluates
all Sobol samples simultaneously using batch propagator lookups.
Requires ``precompute_C_table`` for the batch C evaluation.
For the ``iso_R + diag_C`` case with scalar coupling (no
propagator indices), the integrand at each sample is::
coupling × prod_R R(t_l, t_r) × prod_C C_diag(t_l, t_r)
where all R and C values are looked up in batch.
Args:
positions: Optional mapping ``{point_name: spatial_coord}``
(e.g. ``{'x': 0.0, 'y': 0.5}``). When the cache has a
spatial table built (any :attr:`~PropagatorCache.homogeneity`
kind), the x-coordinate of each propagator endpoint is
derived from its direction group's external-point
position and flows through ``cache.C_at_batch``.
Ignored when no spatial table is built (legacy
behaviour — ``C_diagonal_batch`` has no x-dependence
regardless).
integrate_over: Controls which **external** points have
their time integrated over ``[t_min, lambda_f]``.
- ``None`` (default, physics observable):
**all externals held fixed at ``lambda_f``** —
this gives the equal-time correlator
``⟨φ(t_f) φ(t_f)⟩`` that demo notebooks and MC
data compare against.
- ``"all"``: all externals integrated — the
time-integrated moment
``⟨∫₀^{t_f}φ(t) dt · ∫₀^{t_f}φ(t') dt'⟩``.
Useful e.g. for weak-lensing line-of-sight
integrals of the Sachs field.
- Iterable of external point names (subset):
those listed are integrated, the rest fixed at
``lambda_f``. Enables mixed observables such as
an integrated source × a detector field.
Internal vertex times are always integrated; their
causal parent-map uses the (possibly fixed)
external times as upper bounds.
Returns:
``(estimate, std_error)``
"""
from scipy.stats import qmc
spatial = self.spatial
int_vars_pf = list(reversed(spatial.time_integration_vars))
ext_vars = list(spatial.external_points)
# Partition externals into "integrated" vs "fixed at lambda_f".
ext_int_set = _resolve_integrate_over(integrate_over, ext_vars)
ext_integrated = [v for v in ext_vars if v in ext_int_set]
ext_fixed = [v for v in ext_vars if v not in ext_int_set]
n_ext_int = len(ext_integrated)
# Sobol-dimensioned vars = integrated externals + internals.
sobol_vars = ext_integrated + int_vars_pf
n_total = len(sobol_vars)
if n_total == 0:
# No integration variables, but callable couplings can still be
# dynamic after R-absorption aliases their legs to fixed externals.
val = self._evaluate_zero_dimensional(
lambda_f, cache, direction=direction, positions=positions,
)
return (val, 0.0)
if spatial.r_propagators and not cache.model.iso_R:
raise NotImplementedError(
"method='qmc_vectorized' currently supports scalar "
"iso_R=True response propagators only. Use method='qmc' "
"or method='qmc_scalar' for matrix-valued R."
)
# Build causal parent map (per-internal-var upper bound list).
parent_map: dict[str, list[str]] = defaultdict(list)
for earlier, later in spatial.time_orderings:
if earlier in int_vars_pf:
parent_map[earlier].append(later)
# Sobol samples
sampler = qmc.Sobol(d=n_total, seed=seed)
u = sampler.random(n_samples)
span = lambda_f - t_min
# Map u -> times with causal bounds (vectorized)
times_arr = np.empty((n_samples, n_total))
jacobians = np.ones(n_samples)
# Integrated external vars: free in [t_min, lambda_f]
for k in range(n_ext_int):
times_arr[:, k] = t_min + u[:, k] * span
jacobians *= span
# Internal vars: bounded by parents. Parents may be
# (a) integrated externals — pull from times_arr; (b) fixed
# externals — use the fixed ``lambda_f`` value; (c) other
# internal vars — pull from times_arr.
for k, var in enumerate(int_vars_pf):
idx = n_ext_int + k
parents = parent_map.get(var, [])
if parents:
hi = np.full(n_samples, lambda_f)
for p in parents:
if p in int_vars_pf:
p_idx = n_ext_int + int_vars_pf.index(p)
hi = np.minimum(hi, times_arr[:, p_idx])
elif p in ext_integrated:
p_idx = ext_integrated.index(p)
hi = np.minimum(hi, times_arr[:, p_idx])
else:
# Fixed external — its time is lambda_f.
hi = np.minimum(hi, lambda_f)
else:
hi = np.full(n_samples, lambda_f)
width = hi - t_min
valid = width > 0
times_arr[:, idx] = np.where(valid, t_min + u[:, idx] * width, t_min)
jacobians = np.where(valid, jacobians * width, 0.0)
# Build variable-name to column lookup. Fixed externals are
# NOT in times_arr; they get a special lookup that returns a
# constant ``lambda_f`` array.
var_to_col = {var: i for i, var in enumerate(sobol_vars)}
fixed_t = np.full(n_samples, lambda_f)
# Equal-time alias map: see analyze_spatial / SpatialStructure.
_et_alias = dict(spatial.equal_time_aliases or ())
def _times(var: str) -> np.ndarray:
"""Return the per-sample time array for ``var`` — pulls
from ``times_arr`` when integrated, or the constant
``lambda_f`` array when fixed. Non-representative legs of
an ``equal_time`` non-local vertex are transparently
redirected to their canonical representative so the
callable sees a single shared time across the m legs."""
var = _et_alias.get(var, var)
col = var_to_col.get(var)
if col is not None:
return times_arr[:, col]
return fixed_t
# --- Vectorized integrand evaluation ---
dt = self.diagram_term
coeff = self.coupling_array
prop_idx = dt.propagator_indices
# R product (vectorized) — skip absorbed R's per
# ``DiagramTerm.r_absorbed_pairs``; those factors are already
# baked into the κ^(m)_R callable.
r_product = np.ones(n_samples)
for sl, sr in _kept_r_propagators(spatial):
t_l = _times(sl)
t_r = _times(sr)
r_product *= cache.R_time_batch(t_l, t_r)
fi = self.fixed_indices
# --- Spatial-aware dispatch ---
# Decide once per call how to evaluate each C propagator.
# We route through ``C_at_batch`` whenever the cache has a
# spatial table built (of any homogeneity kind); otherwise
# fall back to the legacy ``C_diagonal_batch`` which ignores
# x and matches the pre-Phase-5 behaviour bit-identically.
spatial_aware = _cache_has_spatial_table(cache)
group_x: dict[str, float] | None = None
if spatial_aware:
group_x = self._resolve_group_x(spatial, positions, direction)
def _lookup_C(sp_l: str, sp_r: str, t_l: np.ndarray,
t_r: np.ndarray) -> np.ndarray:
"""Evaluate C at the batch of (t_l, t_r) samples, carrying
the appropriate x-coordinate when the cache is
spatial-aware."""
if not spatial_aware:
return cache.C_diagonal_batch(t_l, t_r)
x_l = group_x[spatial.direction_map[sp_l]] # type: ignore[index]
x_r = group_x[spatial.direction_map[sp_r]] # type: ignore[index]
return cache.C_at_batch(t_l, t_r, x_l, x_r)
if self.dynamic_coupling is not None:
# --- Per-sample (dynamic) coupling path ---
# Triggered when any coupling_value passed to
# ``build_integrand`` was callable (e.g. a
# spacetime-dependent non-local vertex like demo2's
# ``κ^{(3)}``). Per-sample cost is one callable
# invocation per dynamic symbol × one cheap
# :meth:`DiagramTerm.evaluate_coupling` substitution.
#
# Vectorisation strategy:
# * ``r_product`` is already (n_samples,)-batched above.
# * ``c_product`` is hoisted out of the per-sample loop
# and computed via batched ``C_at_batch`` calls in
# parity with the static scalar path below (line ~2235).
# * Per-symbol ``label_t`` / ``label_x`` arrays are built
# ONCE so the inner loop only does scalar slicing.
# * If all active dynamic symbols set
# ``vectorized=True``, ``DynamicCouplingPromise``
# collects (n_samples,) coupling values in a single
# call instead of n_samples calls; otherwise we fall
# through to the per-sample loop.
all_spatial_labels = set(spatial.direction_map.keys())
_promise = self.dynamic_coupling
# --- A: hoist C-propagator lookup out of per-sample loop. ---
c_product = np.ones(n_samples)
for sp_l, sp_r, il, ir in spatial.c_propagators:
t_l = _times(sp_l)
t_r = _times(sp_r)
C_batch = _lookup_C(sp_l, sp_r, t_l, t_r)
a_i = (
DiagramIntegrand._resolve_component(il, fi)
if il is not None else None
)
b_i = (
DiagramIntegrand._resolve_component(ir, fi)
if ir is not None else None
)
c_product *= _select_C_batch(C_batch, a_i, b_i)
# --- B: pre-build per-symbol-leg time / position arrays. ---
label_t = {lab: _times(lab) for lab in all_spatial_labels}
if spatial_aware:
# group_x is computed above (line ~2146) when the
# cache is spatial-aware; reuse it.
label_x = {
lab: group_x[spatial.direction_map[lab]] # type: ignore[index]
for lab in all_spatial_labels
}
else:
label_x = {
lab: float(direction)
for lab in all_spatial_labels
}
couplings = _promise.evaluate_at_batch(
label_t=label_t,
label_x=label_x,
n_samples=n_samples,
)
# ``couplings`` is a (n_samples,) complex/float array of
# the contracted scalar coupling at each sample. The
# NotImplementedError for prop-indexed dynamic is raised
# inside ``evaluate_at_batch``.
values = (
couplings.real * r_product * c_product * jacobians
)
elif not prop_idx:
# Scalar coupling path (iso_R + iso_C or no prop indices)
c_product = np.ones(n_samples)
for sp_l, sp_r, il, ir in spatial.c_propagators:
t_l = _times(sp_l)
t_r = _times(sp_r)
C_diag_batch = _lookup_C(sp_l, sp_r, t_l, t_r)
if il is not None and ir is not None:
a = DiagramIntegrand._resolve_component(il, fi)
b = DiagramIntegrand._resolve_component(ir, fi)
c_product *= _select_C_batch(C_diag_batch, a, b)
else:
c_product *= _select_C_batch(C_diag_batch, None, None)
values = r_product * complex(coeff).real * c_product * jacobians
else:
# Propagator-indexed coupling: loop over index combinations
idx_names = [name for name, _ in prop_idx]
prop_shape = tuple(dim for _, dim in prop_idx)
values = np.zeros(n_samples)
for pidx in np.ndindex(*prop_shape):
c_val = complex(coeff[pidx]).real if coeff.ndim > 0 else complex(coeff).real
if abs(c_val) < 1e-20:
continue
idx_map = {**fi, **dict(zip(idx_names, pidx))}
c_prod = np.ones(n_samples)
for sp_l, sp_r, il, ir in spatial.c_propagators:
t_l = _times(sp_l)
t_r = _times(sp_r)
C_diag_batch = _lookup_C(sp_l, sp_r, t_l, t_r)
a = DiagramIntegrand._resolve_component(il, idx_map)
b = DiagramIntegrand._resolve_component(ir, idx_map)
c_prod *= _select_C_batch(C_diag_batch, a, b)
values += c_val * r_product * c_prod * jacobians
# Mask invalid samples
values = np.where(jacobians > 0, values, 0.0)
estimate = float(np.mean(values))
# Error estimate
n_batches = min(8, n_samples)
batch_size = n_samples // n_batches
batch_means = np.array([
np.mean(values[i * batch_size:(i + 1) * batch_size])
for i in range(n_batches)
])
std_error = float(np.std(batch_means, ddof=1) / np.sqrt(n_batches))
return (estimate, std_error)
[docs]
def integrate_moment_gauss_legendre(
self,
lambda_f: float,
cache: PropagatorCache,
t_min: float = 0.0,
direction: Any = 0,
n_gauss: int = 8,
positions: dict[str, float] | None = None,
integrate_over: Any = None,
) -> tuple[float, float]:
"""Tensor-product Gauss-Legendre quadrature on the causal
simplex.
Mirrors :meth:`integrate_moment_qmc_vectorized` node-for-node
-- the SAME causal-simplex mapping (parents → upper bounds →
Jacobians) and the SAME vectorised batch path through
:meth:`PropagatorCache.R_time_batch` and
:meth:`PropagatorCache.C_at_batch` -- but replaces the Sobol
``n_samples`` quasi-random nodes with the deterministic
``n_gauss^d`` tensor product of 1-D Gauss-Legendre nodes
(mapped from ``[-1, 1]`` to ``[0, 1]``).
For smooth integrands (a finite product of exponentials --
the typical R/C/κ kernel structure) this gives **exponential
convergence in n_gauss**, vastly outperforming Sobol QMC at
modest dimensionality. In particular, demo2's FK channel
(4D smooth integrand on a causal simplex of area
``t_f^2 / 2``) is dominated at large ``t_f`` by a narrow
peak band of area ``~ σ_t/γ`` near the upper-right corner;
Sobol QMC under-resolves it unless ``n_samples`` is enormous,
while ``n_gauss=8`` (4096 nodes for d=4) recovers the
notebook's hand-derived value to ~5 sig figs.
**Cost trade-off.** Tensor-product GL scales as
``n_gauss^d`` -- fine for ``d ≤ 5`` (demo2 FK), fast at
``d ≤ 4``, painful at ``d ≥ 7``. For high-d integrands
prefer ``method='qmc_vectorized'`` with a large ``n_samples``
instead.
Args:
n_gauss: Number of GL nodes per dimension. ``n=8`` is
a good default (handles up to degree-15 polynomial
exactly).
positions, integrate_over: Same as
:meth:`integrate_moment_qmc_vectorized`.
Returns:
``(estimate, 0.0)`` -- GL is deterministic so there is
no statistical error to report. The 0.0 mirrors the
return shape of the QMC variants for downstream code.
"""
from numpy.polynomial.legendre import leggauss
spatial = self.spatial
int_vars_pf = list(reversed(spatial.time_integration_vars))
ext_vars = list(spatial.external_points)
ext_int_set = _resolve_integrate_over(integrate_over, ext_vars)
ext_integrated = [v for v in ext_vars if v in ext_int_set]
ext_fixed = [v for v in ext_vars if v not in ext_int_set]
n_ext_int = len(ext_integrated)
gl_vars = ext_integrated + int_vars_pf
n_total = len(gl_vars)
if n_total == 0:
# No integration variables, but callable couplings can still be
# dynamic after R-absorption aliases their legs to fixed externals.
val = self._evaluate_zero_dimensional(
lambda_f, cache, direction=direction, positions=positions,
)
return (val, 0.0)
if spatial.r_propagators and not cache.model.iso_R:
raise NotImplementedError(
"method='gauss_legendre' currently supports scalar "
"iso_R=True response propagators only. Use method='qmc' "
"or method='qmc_scalar' for matrix-valued R."
)
# 1-D Gauss-Legendre nodes / weights mapped from [-1, 1] to [0, 1].
nodes_1d, weights_1d = leggauss(n_gauss)
u_1d = (nodes_1d + 1) / 2
w_1d = weights_1d / 2
# Tensor product of n_total dims. ``np.meshgrid(..., indexing='ij')``
# gives arrays of shape (n_gauss,) * n_total; ravel and stack to
# (n_pts, n_total).
mesh = np.meshgrid(*([u_1d] * n_total), indexing="ij")
u = np.stack([m.ravel() for m in mesh], axis=-1)
w_mesh = np.meshgrid(*([w_1d] * n_total), indexing="ij")
node_weights = np.prod(
np.stack([wm.ravel() for wm in w_mesh], axis=-1), axis=1
)
n_samples = u.shape[0] # = n_gauss ** n_total
# --- Causal mapping: identical to QMC vectorised path. ---
parent_map: dict[str, list[str]] = defaultdict(list)
for earlier, later in spatial.time_orderings:
if earlier in int_vars_pf:
parent_map[earlier].append(later)
span = lambda_f - t_min
times_arr = np.empty((n_samples, n_total))
jacobians = np.ones(n_samples)
for k in range(n_ext_int):
times_arr[:, k] = t_min + u[:, k] * span
jacobians *= span
for k, var in enumerate(int_vars_pf):
idx = n_ext_int + k
parents = parent_map.get(var, [])
if parents:
hi = np.full(n_samples, lambda_f)
for p in parents:
if p in int_vars_pf:
p_idx = n_ext_int + int_vars_pf.index(p)
hi = np.minimum(hi, times_arr[:, p_idx])
elif p in ext_integrated:
p_idx = ext_integrated.index(p)
hi = np.minimum(hi, times_arr[:, p_idx])
else:
hi = np.minimum(hi, lambda_f)
else:
hi = np.full(n_samples, lambda_f)
width = hi - t_min
valid = width > 0
times_arr[:, idx] = np.where(valid, t_min + u[:, idx] * width, t_min)
jacobians = np.where(valid, jacobians * width, 0.0)
var_to_col = {var: i for i, var in enumerate(gl_vars)}
fixed_t = np.full(n_samples, lambda_f)
# Equal-time alias: aliased K-vertex legs share the canonical
# representative's time variable. Resolved transparently inside
# ``_times`` so r-propagator, c-propagator, and dynamic-coupling
# call sites all see the collapsed time.
_et_alias = dict(spatial.equal_time_aliases or ())
def _times(var: str) -> np.ndarray:
var = _et_alias.get(var, var)
col = var_to_col.get(var)
if col is not None:
return times_arr[:, col]
return fixed_t
# --- Vectorised integrand evaluation: identical to QMC path. ---
dt = self.diagram_term
coeff = self.coupling_array
prop_idx = dt.propagator_indices
r_product = np.ones(n_samples)
for sl, sr in _kept_r_propagators(spatial):
r_product *= cache.R_time_batch(_times(sl), _times(sr))
fi = self.fixed_indices
spatial_aware = _cache_has_spatial_table(cache)
group_x: dict[str, float] | None = None
if spatial_aware:
group_x = self._resolve_group_x(spatial, positions, direction)
def _lookup_C(sp_l, sp_r, t_l, t_r):
if not spatial_aware:
return cache.C_diagonal_batch(t_l, t_r)
x_l = group_x[spatial.direction_map[sp_l]] # type: ignore[index]
x_r = group_x[spatial.direction_map[sp_r]] # type: ignore[index]
return cache.C_at_batch(t_l, t_r, x_l, x_r)
if self.dynamic_coupling is not None:
all_spatial_labels = set(spatial.direction_map.keys())
_promise = self.dynamic_coupling
c_product = np.ones(n_samples)
for sp_l, sp_r, il, ir in spatial.c_propagators:
t_l = _times(sp_l)
t_r = _times(sp_r)
C_batch = _lookup_C(sp_l, sp_r, t_l, t_r)
a_i = (
DiagramIntegrand._resolve_component(il, fi)
if il is not None else None
)
b_i = (
DiagramIntegrand._resolve_component(ir, fi)
if ir is not None else None
)
c_product *= _select_C_batch(C_batch, a_i, b_i)
label_t = {lab: _times(lab) for lab in all_spatial_labels}
if spatial_aware:
label_x = {
lab: group_x[spatial.direction_map[lab]] # type: ignore[index]
for lab in all_spatial_labels
}
else:
label_x = {
lab: float(direction)
for lab in all_spatial_labels
}
couplings = _promise.evaluate_at_batch(
label_t=label_t,
label_x=label_x,
n_samples=n_samples,
)
values = couplings.real * r_product * c_product * jacobians
elif not prop_idx:
c_product = np.ones(n_samples)
for sp_l, sp_r, il, ir in spatial.c_propagators:
t_l = _times(sp_l)
t_r = _times(sp_r)
C_diag_batch = _lookup_C(sp_l, sp_r, t_l, t_r)
if il is not None and ir is not None:
a = DiagramIntegrand._resolve_component(il, fi)
b = DiagramIntegrand._resolve_component(ir, fi)
c_product *= _select_C_batch(C_diag_batch, a, b)
else:
c_product *= _select_C_batch(C_diag_batch, None, None)
values = r_product * complex(coeff).real * c_product * jacobians
else:
idx_names = [name for name, _ in prop_idx]
prop_shape = tuple(dim for _, dim in prop_idx)
values = np.zeros(n_samples)
for pidx in np.ndindex(*prop_shape):
c_val = (
complex(coeff[pidx]).real if coeff.ndim > 0
else complex(coeff).real
)
if abs(c_val) < 1e-20:
continue
idx_map = {**fi, **dict(zip(idx_names, pidx))}
c_prod = np.ones(n_samples)
for sp_l, sp_r, il, ir in spatial.c_propagators:
t_l = _times(sp_l)
t_r = _times(sp_r)
C_diag_batch = _lookup_C(sp_l, sp_r, t_l, t_r)
a = DiagramIntegrand._resolve_component(il, idx_map)
b = DiagramIntegrand._resolve_component(ir, idx_map)
c_prod *= _select_C_batch(C_diag_batch, a, b)
values += c_val * r_product * c_prod * jacobians
# --- GL aggregation: weighted sum (NOT mean). ---
# Each tensor-product node carries weight ``w_i = prod_d w_1d[i_d]``.
# The integrand on the unit cube is therefore
# integral = sum_i (values[i] * w_i)
# where ``values[i]`` already includes the causal-simplex
# Jacobian. Compare to QMC which uses ``mean(values) =
# sum(values) / n_samples`` (Monte Carlo estimator).
values = np.where(jacobians > 0, values, 0.0)
estimate = float(np.sum(values * node_weights))
return (estimate, 0.0)
[docs]
def integrate_moment_nquad(
self,
lambda_f: float,
cache: PropagatorCache,
t_min: float = 0.0,
direction: Any = 0,
positions: dict[str, float] | None = None,
integrate_over: Any = None,
) -> tuple[float, float]:
"""Integrate over time variables using nested adaptive
quadrature.
See :meth:`integrate_moment_qmc_vectorized` for the
``integrate_over`` kwarg semantics (external-time partition
into integrated vs fixed-at-``lambda_f``).
"""
from scipy.integrate import nquad as _nquad
spatial = self.spatial
int_vars = list(spatial.time_integration_vars)
ext_vars = list(spatial.external_points)
ext_int_set = _resolve_integrate_over(integrate_over, ext_vars)
ext_integrated = [v for v in ext_vars if v in ext_int_set]
ext_fixed = [v for v in ext_vars if v not in ext_int_set]
# Quadrature variables = internals + integrated externals.
all_vars = int_vars + ext_integrated
n_int = len(int_vars)
n_total = len(all_vars)
if _cache_has_spatial_table(cache):
directions = self._resolve_group_x(spatial, positions, direction)
else:
dir_vars = set(spatial.direction_map.values())
directions = {d: direction for d in dir_vars}
fixed_times = {v: lambda_f for v in ext_fixed}
if n_total == 0:
val = self._evaluate_zero_dimensional(
lambda_f, cache, direction=direction, positions=positions,
)
return (val, 0.0)
# Spacetime-dependent (callable) couplings are NOT supported
# by the nquad path: ``self.evaluate`` uses the static
# ``coupling_array`` which is a placeholder zero array when
# the integrand was built from a callable κ. Multiplying by
# 0 would silently return 0 for every diagram with a
# dynamic vertex (a latent bug pre-2026-04). We refuse
# explicitly and point to ``method='gauss_legendre'`` -- a
# tensor-product GL rule with deterministic exponential
# convergence on smooth integrands, which is the natural
# match for diagrams that have callable couplings (and
# vastly outperforms 4D adaptive nquad in practice anyway).
if self.dynamic_coupling is not None:
raise NotImplementedError(
"method='nquad' does not support spacetime-dependent "
"(callable) couplings. Use method='gauss_legendre' "
"(deterministic, exponential convergence on smooth "
"integrands, matches the notebook hand-derivation) "
"or method='qmc_vectorized' (Sobol QMC, recommended "
"for high-d diagrams) instead."
)
def f(*args: float) -> float:
times = dict(fixed_times)
for i, var in enumerate(all_vars):
times[var] = args[i]
result = self.evaluate(times, directions, cache)
return result.real if result.imag == 0 else abs(result)
# Causal bounds for internal vars
upper_bounds: dict[str, list[str]] = defaultdict(list)
for earlier, later in spatial.time_orderings:
if earlier in int_vars:
upper_bounds[earlier].append(later)
ranges: list = []
for i, var in enumerate(all_vars):
if i >= n_int:
ranges.append((t_min, lambda_f))
else:
ub_sources = upper_bounds.get(var, [])
if not ub_sources:
ranges.append((t_min, lambda_f))
else:
# Fixed externals contribute a constant upper bound
# ``lambda_f`` (no longer showing up in later_args).
ub_dyn = [src for src in ub_sources
if src not in fixed_times]
def make_bound(
ub: list[str], avars: list[str],
lo: float, hi: float, cur_i: int,
) -> Callable:
def bound_func(*later_args: float) -> tuple[float, float]:
hi_vals = [hi]
for src in ub:
j = avars.index(src) - cur_i - 1
if 0 <= j < len(later_args):
hi_vals.append(later_args[j])
else:
hi_vals.append(hi)
return (lo, min(hi_vals))
return bound_func
ranges.append(
make_bound(ub_dyn, all_vars, t_min, lambda_f, i)
)
val, err = _nquad(f, ranges)
return (val, err)
def _cache_supports_batch_c(cache: "PropagatorCache") -> bool:
"""Whether ``cache`` can evaluate C propagators in batch.
True when the cache either has a pre-computed spline table
(``_c_splines is not None`` on :class:`PropagatorCache`) or is a
custom cache with a native batch implementation that sets
``_c_splines`` to a truthy sentinel (e.g. the analytical cache
used in ``examples/demo1`` and ``tests/test_deductive_numerics``).
"""
return getattr(cache, "_c_splines", None) is not None
def _cache_has_spatial_table(cache: "PropagatorCache") -> bool:
"""Whether ``cache`` has been equipped with any spatial
(x-aware) table — full or lazy, any homogeneity kind.
Used by the integrators to decide whether to route each C
propagator through the spatial-aware ``C_at_batch`` path or the
legacy ``C_diagonal_batch`` path (which ignores x). False when
the cache was built with only the legacy ``precompute_C_table``
or when it is a custom cache that doesn't implement the new
spatial attributes — in both cases the legacy path is
bit-identical to pre-Phase-5 behaviour.
"""
if getattr(cache, "_closed_form_only", False):
# Closed-form-only mode: no spline at all, but the
# ``C_at_batch`` override IS spatial-aware (it routes the
# per-sample positions straight into the user's c_fn).
return True
names = (
"_c_translation_splines",
"_lazy_translation",
"_c_rotation_splines",
"_lazy_rotation",
"_c_general_interpolators",
"_lazy_general",
)
return any(getattr(cache, n, None) is not None for n in names)
[docs]
def integrate_moment(
integrand: DiagramIntegrand,
lambda_f: float,
cache: PropagatorCache,
t_min: float = 0.0,
direction: Any = 0,
method: str = "qmc",
n_samples: int = 2**14,
seed: int | None = None,
positions: dict[str, float] | None = None,
integrate_over: Any = None,
n_gauss: int = 8,
) -> tuple[float, float]:
"""Integrate a diagram's contribution over all time variables.
Convenience wrapper around
:meth:`DiagramIntegrand.integrate_moment_qmc`,
:meth:`DiagramIntegrand.integrate_moment_qmc_vectorized`, and
:meth:`DiagramIntegrand.integrate_moment_nquad`.
**Dispatch logic.** With ``method='qmc'`` (default) the function
auto-selects the fastest QMC path compatible with the supplied
cache:
- If the cache supports batched C evaluation (either
``PropagatorCache.precompute_C_table`` has been called, or a
custom cache implements ``C_diagonal_batch`` natively) and
``cache.model.iso_R`` is true →
:meth:`integrate_moment_qmc_vectorized` (~200× faster than
the scalar loop on typical workloads).
- Otherwise → :meth:`integrate_moment_qmc` (scalar Python loop).
Users who explicitly want the scalar loop (e.g. for debugging
or with a non-batch-capable custom cache) can pass
``method='qmc_scalar'``.
For best performance, call
:meth:`PropagatorCache.precompute_C_table` before integrating so
the vectorised path is selected automatically.
Args:
integrand: A :class:`DiagramIntegrand` built from a
:class:`~sft_wick.perturbation.DiagramTerm`.
lambda_f: Upper integration limit for external times.
cache: A :class:`PropagatorCache` (or compatible custom
cache).
t_min: Lower time bound (default 0).
direction: Direction value for all spatial points.
method: ``'qmc'`` (default, auto-selects vectorised vs
scalar), ``'qmc_scalar'`` (force scalar loop),
``'qmc_vectorized'`` (force vectorised; raises if cache
doesn't support batch), or ``'nquad'`` (nested adaptive
quadrature).
n_samples: Number of Sobol samples (QMC only, should be
a power of 2).
seed: Random seed for reproducibility (QMC only).
Returns:
``(estimate, error)`` tuple.
"""
if method == "qmc":
if cache.model.iso_R and _cache_supports_batch_c(cache):
return integrand.integrate_moment_qmc_vectorized(
lambda_f, cache, t_min=t_min, direction=direction,
n_samples=n_samples, seed=seed, positions=positions,
integrate_over=integrate_over,
)
return integrand.integrate_moment_qmc(
lambda_f, cache, t_min=t_min, direction=direction,
n_samples=n_samples, seed=seed, positions=positions,
integrate_over=integrate_over,
)
elif method == "qmc_vectorized":
return integrand.integrate_moment_qmc_vectorized(
lambda_f, cache, t_min=t_min, direction=direction,
n_samples=n_samples, seed=seed, positions=positions,
integrate_over=integrate_over,
)
elif method == "qmc_scalar":
return integrand.integrate_moment_qmc(
lambda_f, cache, t_min=t_min, direction=direction,
n_samples=n_samples, seed=seed, positions=positions,
integrate_over=integrate_over,
)
elif method == "nquad":
return integrand.integrate_moment_nquad(
lambda_f, cache, t_min=t_min, direction=direction,
positions=positions,
integrate_over=integrate_over,
)
elif method == "gauss_legendre":
return integrand.integrate_moment_gauss_legendre(
lambda_f, cache, t_min=t_min, direction=direction,
n_gauss=n_gauss, positions=positions,
integrate_over=integrate_over,
)
else:
raise ValueError(
f"Unknown method {method!r}; use 'qmc', 'qmc_scalar', "
f"'qmc_vectorized', 'nquad', or 'gauss_legendre'"
)
def _eval_single_diagram(
dt, coupling_values, fixed_indices, lambda_f, cache, t_min, direction,
method, n_samples, seed, positions, integrate_over, n_gauss,
):
"""Evaluate one diagram term end-to-end (build integrand + integrate).
Top-level function so it is picklable for multiprocessing.
"""
ig = dt.build_integrand(coupling_values, fixed_indices)
return integrate_moment(
ig, lambda_f, cache,
t_min=t_min, direction=direction,
method=method, n_samples=n_samples, seed=seed,
positions=positions, integrate_over=integrate_over,
n_gauss=n_gauss,
)
[docs]
def integrate_diagrams(
diagram_terms: list,
coupling_values: dict,
lambda_f: float,
cache: "PropagatorCache",
t_min: float = 0.0,
direction: Any = 0,
method: str = "qmc",
n_samples: int = 2**14,
seed: int | None = None,
fixed_indices: dict[str, int] | None = None,
n_jobs: int = 1,
positions: dict[str, Any] | None = None,
integrate_over: Any = None,
n_gauss: int = 8,
) -> tuple[float, list[tuple[float, float]]]:
"""Integrate a batch of diagram terms, optionally in parallel.
Builds integrands and evaluates all diagrams, returning the total
and per-diagram results. When ``n_jobs != 1``, diagrams are
evaluated in parallel using :mod:`joblib`.
.. note::
``n_jobs`` defaults to ``1`` (sequential), matching the L1
``Expansion.evaluate`` / ``Expansion.sweep`` defaults. Pass
``-1`` to use all CPU cores. Sequential and parallel paths
are bit-identical when the QMC seed is fixed.
Args:
diagram_terms: List of :class:`~sft_wick.perturbation.DiagramTerm`.
coupling_values: Coupling tensor arrays (e.g. ``{'F': F_code}``).
lambda_f: Upper integration limit.
cache: A :class:`PropagatorCache`.
t_min: Lower time bound (default 0).
direction: Direction value for all spatial points.
method: ``'qmc'`` or ``'nquad'``.
n_samples: Number of Sobol samples (QMC only).
seed: Random seed (QMC only).
fixed_indices: Optional pinned index values for
:meth:`~sft_wick.perturbation.DiagramTerm.build_integrand`.
n_jobs: Number of parallel jobs. ``1`` = sequential
(default, safe under nested-callers), ``-1`` = all CPU
cores. A built-in guard falls
back to sequential for ``len(diagram_terms) <= 2`` to
avoid joblib's ~1 s startup overhead on trivial batches.
Requires :mod:`joblib` when ``n_jobs != 1``.
Returns:
``(total, details)`` where *total* is the scalar sum and
*details* is a list of ``(estimate, error)`` per diagram.
"""
if not diagram_terms:
return (0.0, [])
if n_jobs == 1 or len(diagram_terms) <= 2:
# Sequential — no overhead
details = []
for dt in diagram_terms:
ig = dt.build_integrand(coupling_values, fixed_indices)
val, err = integrate_moment(
ig, lambda_f, cache,
t_min=t_min, direction=direction,
method=method, n_samples=n_samples, seed=seed,
positions=positions, integrate_over=integrate_over,
n_gauss=n_gauss,
)
details.append((val, err))
else:
# Parallel via joblib
from joblib import Parallel, delayed
results = Parallel(n_jobs=n_jobs, backend="loky")(
delayed(_eval_single_diagram)(
dt, coupling_values, fixed_indices,
lambda_f, cache, t_min, direction,
method, n_samples, seed, positions, integrate_over,
n_gauss,
)
for dt in diagram_terms
)
details = list(results)
total = sum(v for v, _ in details)
return (total, details)
# ---------------------------------------------------------------------------
# Two-point correlation function integration
# ---------------------------------------------------------------------------
[docs]
def integrate_two_point_qmc(
integrands: list["DiagramIntegrand"],
t_f: float,
positions: dict[str, float],
cache: "PropagatorCache",
t_min: float = 0.0,
n_samples: int = 2**14,
seed: int | None = None,
) -> tuple[float, float]:
"""Vectorised QMC integration for two-point correlation functions.
Evaluates the sum of diagram integrands at external time *t_f*
with spatial points at the positions given by *positions*. This
is the standard entry point for computing correlators such as
``⟨φ(x, t) φ(y, t)⟩`` where *x* and *y* may differ.
Unlike :func:`integrate_moment`, which assigns a single direction
value to all spatial points, this function supports per-point
spatial positions and applies the appropriate spatial factor to
each correlation propagator automatically.
The spatial factor for a *C*-propagator connecting points at
positions *n₁* and *n₂* is computed as::
κ²(n₁, t_ref; n₂, t_ref) / κ²(0, t_ref; 0, t_ref)
which is exact for separable kernels (where the factor is
time-independent) and a good approximation otherwise.
Requires :meth:`PropagatorCache.precompute_C_table` to have been
called for the batch *C* evaluation fast path.
Args:
integrands: List of :class:`DiagramIntegrand` objects (one per
non-vanishing Feynman diagram).
t_f: External (observation) time — all external points are set
to this time.
positions: ``{point_name: position}`` mapping each spatial
point name (e.g. ``"x"``, ``"y"``) to a scalar spatial
coordinate. Points not in the dict default to 0.
cache: A :class:`PropagatorCache` with pre-computed *C* table.
t_min: Lower time bound (default 0).
n_samples: Number of Sobol samples (should be a power of 2).
seed: Random seed for the Sobol sequence.
Returns:
``(estimate, std_error)`` for the summed two-point function.
"""
from scipy.stats import qmc as _qmc
total = 0.0
total_err_sq = 0.0
for ig in integrands:
sp = ig.spatial
ivs = list(reversed(sp.time_integration_vars))
evs = list(sp.external_points)
# Build direction dict from positions.
# External points (which carry user-specified positions) take
# priority over vertex points (which default to 0).
directions: dict[str, float] = {}
for pt in evs:
dvar = sp.direction_map.get(pt)
if dvar is not None and pt in positions:
directions[dvar] = positions[pt]
for pt, dvar in sp.direction_map.items():
if dvar not in directions:
directions[dvar] = positions.get(pt, 0.0)
# Pre-compute spatial factors for each C propagator
# factor = kappa2(n1, t_ref, n2, t_ref) / kappa2(0, t_ref, 0, t_ref)
t_ref = max(t_min + 0.1, t_f * 0.5)
model = cache.model
kappa_00 = model.kappa2(
np.array([0.0]), t_ref, np.array([0.0]), t_ref
) # (N, N)
c_spatial_factors = []
for sp_l, sp_r, _il, _ir in sp.c_propagators:
dir_l = sp.direction_map[sp_l]
dir_r = sp.direction_map[sp_r]
n_l = directions.get(dir_l, 0.0)
n_r = directions.get(dir_r, 0.0)
if abs(n_l - n_r) < 1e-15:
c_spatial_factors.append(np.ones(model.n_components))
else:
kappa_sep = model.kappa2(
np.array([n_l]), t_ref, np.array([n_r]), t_ref
)
# Diagonal ratio per component
diag_00 = np.diag(kappa_00)
diag_sep = np.diag(kappa_sep)
safe = np.abs(diag_00) > 1e-30
factor = np.where(safe, diag_sep / diag_00, 0.0)
c_spatial_factors.append(factor)
# --- Zero integration variables: direct evaluation ---
et = {v: t_f for v in evs}
ni = len(ivs)
if ni == 0:
val = ig.evaluate(et, directions, cache)
total += val.real
continue
# --- Causal parent map ---
pm: dict[str, list[str]] = defaultdict(list)
for earlier, later in sp.time_orderings:
if earlier in ivs:
pm[earlier].append(later)
# --- Sobol samples ---
u = _qmc.Sobol(d=ni, seed=seed).random(n_samples)
t_s = np.zeros((n_samples, ni))
jac = np.ones(n_samples)
for k, var in enumerate(ivs):
ps = pm.get(var, [])
pi = [ivs.index(p) for p in ps if p in ivs]
ep = [t_f for p in ps if p not in ivs]
hi = np.min(t_s[:, pi], axis=1) if pi else np.full(n_samples, t_f)
if ep:
hi = np.minimum(hi, min(ep))
w = hi - t_min
ok = w > 0
t_s[:, k] = np.where(ok, t_min + u[:, k] * w, t_min)
jac = np.where(ok, jac * w, 0.0)
# --- Full time array ---
all_vars = evs + ivs
var_col = {v: j for j, v in enumerate(all_vars)}
t_arr = np.empty((n_samples, len(all_vars)))
for j in range(len(evs)):
t_arr[:, j] = t_f
for j in range(ni):
t_arr[:, len(evs) + j] = t_s[:, j]
# --- Vectorised R product ---
r_prod = np.ones(n_samples)
for sl, sr in _kept_r_propagators(sp):
r_prod *= cache.R_time_batch(
t_arr[:, var_col[sl]], t_arr[:, var_col[sr]]
)
# --- Vectorised C product with spatial factors ---
dt = ig.diagram_term
coeff = ig.coupling_array
prop_idx = dt.propagator_indices
# Fixed indices from the integrand (e.g. observable component
# indices like {'a': 0, 'b': 1}). These must participate in
# _resolve_component so that C-propagator legs carrying a fixed
# index name are evaluated at the correct component instead of
# being summed over all components.
fi = ig.fixed_indices
if not prop_idx:
# Scalar coupling (iso_R + iso_C)
c_prod = np.ones(n_samples)
for ci, (sp_l, sp_r, il, ir) in enumerate(sp.c_propagators):
t_l = t_arr[:, var_col[sp_l]]
t_r = t_arr[:, var_col[sp_r]]
C_diag = cache.C_diagonal_batch(t_l, t_r)
sf = c_spatial_factors[ci]
a = DiagramIntegrand._resolve_component(il, fi)
if a is not None:
c_prod *= C_diag[:, a] * sf[a]
else:
c_prod *= (C_diag * sf[np.newaxis, :]).sum(axis=1)
values = r_prod * complex(coeff).real * c_prod * jac
else:
# Propagator-indexed coupling
idx_names = [name for name, _ in prop_idx]
prop_shape = tuple(dim for _, dim in prop_idx)
values = np.zeros(n_samples)
for pidx in np.ndindex(*prop_shape):
c_val = (
complex(coeff[pidx]).real
if coeff.ndim > 0
else complex(coeff).real
)
if abs(c_val) < 1e-20:
continue
idx_map = {**fi, **dict(zip(idx_names, pidx))}
c_prod = np.ones(n_samples)
for ci, (sp_l, sp_r, il, ir) in enumerate(
sp.c_propagators
):
t_l = t_arr[:, var_col[sp_l]]
t_r = t_arr[:, var_col[sp_r]]
C_diag = cache.C_diagonal_batch(t_l, t_r)
sf = c_spatial_factors[ci]
a = DiagramIntegrand._resolve_component(il, idx_map)
b = DiagramIntegrand._resolve_component(ir, idx_map)
if a is not None and b is not None:
c_prod *= C_diag[:, a] * sf[a]
else:
c_prod *= (C_diag * sf[np.newaxis, :]).sum(axis=1)
values += c_val * r_prod * c_prod * jac
values = np.where(jac > 0, values, 0.0)
est = float(np.mean(values))
total += est
# Error from 8 sub-batches
bs = n_samples // 8
bm = np.array(
[np.mean(values[j * bs : (j + 1) * bs]) for j in range(8)]
)
total_err_sq += (float(np.std(bm, ddof=1) / np.sqrt(8))) ** 2
return (total, np.sqrt(total_err_sq))