Coverage for src / chebpy / gpr.py: 100%
172 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-22 21:33 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-22 21:33 +0000
1"""Gaussian process regression with Chebfun representations.
3Implements Gaussian process regression (GPR) following the algorithm described
4in Rasmussen & Williams, *Gaussian Processes for Machine Learning*, MIT Press,
52006, and the MATLAB Chebfun ``gpr.m`` by The University of Oxford and The
6Chebfun Developers.
8The posterior mean, variance, and (optionally) random samples from the
9posterior are all returned as Chebfun / Quasimatrix objects so that they can
10be manipulated with the full ChebPy toolkit (differentiation, integration,
11rootfinding, etc.).
13Reference:
14 C. E. Rasmussen & C. K. I. Williams, "Gaussian Processes for Machine
15 Learning", MIT Press, 2006.
16"""
18from __future__ import annotations
20from collections.abc import Callable
21from dataclasses import dataclass, field
23import numpy as np
24from numpy.typing import ArrayLike
26from .algorithms import chebpts2
27from .chebfun import Chebfun
28from .quasimatrix import Quasimatrix
31# ---------------------------------------------------------------------------
32# Options container
33# ---------------------------------------------------------------------------
34@dataclass
35class _GPROptions:
36 """Parsed options for a GPR call."""
38 sigma: float = 1.0
39 sigma_given: bool = False
40 length_scale: float = 0.0
41 noise: float = 0.0
42 domain: np.ndarray = field(default_factory=lambda: np.array([-1.0, 1.0]))
43 trig: bool = False
44 n_samples: int = 0
47# ---------------------------------------------------------------------------
48# Kernel helpers
49# ---------------------------------------------------------------------------
52def _kernel_matrix(
53 x1: np.ndarray,
54 x2: np.ndarray,
55 opts: _GPROptions,
56) -> np.ndarray:
57 """Evaluate the covariance kernel k(x1_i, x2_j) for all pairs."""
58 r = x1[:, None] - x2[None, :]
59 if opts.trig:
60 period = opts.domain[1] - opts.domain[0]
61 return opts.sigma**2 * np.exp(-2.0 / opts.length_scale**2 * np.sin(np.pi / period * r) ** 2)
62 return opts.sigma**2 * np.exp(-0.5 / opts.length_scale**2 * r**2)
65def _log_marginal_likelihood(
66 length_scale: float | np.ndarray,
67 x: np.ndarray,
68 y: np.ndarray,
69 opts: _GPROptions,
70) -> float | np.ndarray:
71 """Negative log marginal likelihood (eq. 2.30 in Rasmussen & Williams).
73 Accepts scalar or array *length_scale* so that it can be wrapped as a
74 Chebfun for optimisation.
75 """
76 scalar_input = np.ndim(length_scale) == 0
77 ls = np.atleast_1d(np.asarray(length_scale, dtype=float))
78 n = len(x)
79 rx = x[:, None] - x[None, :]
80 result = np.empty_like(ls)
82 for idx in np.ndindex(ls.shape):
83 l_val = ls[idx]
84 if opts.trig:
85 period = opts.domain[1] - opts.domain[0]
86 cov_mat = opts.sigma**2 * np.exp(-2.0 / l_val**2 * np.sin(np.pi / period * rx) ** 2)
87 else:
88 cov_mat = opts.sigma**2 * np.exp(-0.5 / l_val**2 * rx**2)
90 if opts.noise != 0:
91 cov_mat += opts.noise**2 * np.eye(n)
92 else:
93 cov_mat += 1e-15 * n * opts.sigma**2 * np.eye(n)
95 chol_l = np.linalg.cholesky(cov_mat)
96 alpha = np.linalg.solve(chol_l.T, np.linalg.solve(chol_l, y))
97 lml = -0.5 * y @ alpha - np.sum(np.log(np.diag(chol_l))) - 0.5 * n * np.log(2 * np.pi)
98 result[idx] = lml
100 return float(result.item()) if scalar_input else result.ravel()
103# ---------------------------------------------------------------------------
104# Length-scale selection via max log marginal likelihood
105# ---------------------------------------------------------------------------
108def _select_length_scale(x: np.ndarray, y: np.ndarray, opts: _GPROptions) -> float:
109 """Choose the length-scale that maximises the log marginal likelihood."""
110 n = len(x)
111 dom_size = opts.domain[1] - opts.domain[0]
113 if opts.trig:
114 lo, hi = 1.0 / (2 * n), 10.0
115 else:
116 lo, hi = dom_size / (2 * np.pi * n), 10.0 / np.pi * dom_size
118 # Heuristic: shrink the right end of the search domain if the lml is
119 # monotonically decreasing (mirrors the MATLAB implementation).
120 f1 = float(_log_marginal_likelihood(lo, x, y, opts))
121 f2 = float(_log_marginal_likelihood(hi, x, y, opts))
122 while f1 > f2 and hi / lo > 1 + 1e-4:
123 new_bound = lo + (hi - lo) / 10.0
124 f_new = float(_log_marginal_likelihood(new_bound, x, y, opts))
125 if f_new > f1:
126 break
127 hi = new_bound
128 f2 = f_new
130 # Maximise using golden-section search (negated to find the max).
131 return _golden_section_max(lambda ls: float(_log_marginal_likelihood(ls, x, y, opts)), lo, hi)
134def _golden_section_max(f: Callable[[float], float], a: float, b: float, tol: float = 1e-6) -> float:
135 """Golden-section search for the scalar argmax of *f* on [a, b]."""
136 gr = (np.sqrt(5.0) + 1.0) / 2.0
137 c = b - (b - a) / gr
138 d = a + (b - a) / gr
139 while abs(b - a) > tol * (abs(a) + abs(b)):
140 if f(c) > f(d):
141 b = d
142 else:
143 a = c
144 c = b - (b - a) / gr
145 d = a + (b - a) / gr
146 return 0.5 * (a + b)
149# ---------------------------------------------------------------------------
150# Public API
151# ---------------------------------------------------------------------------
154def _parse_inputs(
155 x: ArrayLike,
156 y: ArrayLike,
157 *,
158 sigma: float | None,
159 noise: float,
160 trig: bool,
161 n_samples: int,
162) -> tuple[np.ndarray, np.ndarray, _GPROptions, float]:
163 """Validate inputs and build the initial options container.
165 Returns ``(x_arr, y_arr, opts, scaling_factor)``.
166 """
167 x_arr = np.asarray(x, dtype=float).ravel()
168 y_arr = np.asarray(y, dtype=float).ravel()
169 if x_arr.shape != y_arr.shape:
170 msg = "x and y must have the same length."
171 raise ValueError(msg)
173 opts = _GPROptions(trig=trig, noise=noise, n_samples=n_samples)
175 scaling_factor = 1.0
176 if sigma is not None:
177 opts.sigma = sigma
178 opts.sigma_given = True
179 else:
180 if len(y_arr) > 0:
181 scaling_factor = float(np.max(np.abs(y_arr)))
182 opts.sigma_given = False
183 opts.sigma = scaling_factor
185 return x_arr, y_arr, opts, scaling_factor
188def _infer_domain(
189 x_arr: np.ndarray,
190 opts: _GPROptions,
191 domain: tuple[float, float] | list[float] | np.ndarray | None,
192) -> None:
193 """Set ``opts.domain`` from *domain* or from the observation locations."""
194 if domain is not None:
195 opts.domain = np.asarray(domain, dtype=float)
196 elif len(x_arr) == 0:
197 opts.domain = np.array([-1.0, 1.0])
198 elif len(x_arr) == 1:
199 opts.domain = np.array([x_arr[0] - 1, x_arr[0] + 1])
200 elif opts.trig:
201 span = float(np.max(x_arr) - np.min(x_arr))
202 opts.domain = np.array([float(np.min(x_arr)), float(np.max(x_arr)) + 0.1 * span])
203 else:
204 opts.domain = np.array([float(np.min(x_arr)), float(np.max(x_arr))])
207def _infer_length_scale(
208 x_arr: np.ndarray,
209 y_arr: np.ndarray,
210 opts: _GPROptions,
211 scaling_factor: float,
212 length_scale: float | None,
213) -> None:
214 """Set ``opts.length_scale`` — user-supplied or auto-selected."""
215 if length_scale is not None:
216 opts.length_scale = length_scale
217 return
219 if len(x_arr) == 0:
220 opts.length_scale = 1.0
221 return
223 y_n = y_arr / scaling_factor if scaling_factor != 0 else y_arr
225 if not opts.sigma_given:
226 tmp = _GPROptions(
227 sigma=1.0,
228 sigma_given=True,
229 noise=opts.noise / scaling_factor if scaling_factor != 0 else opts.noise,
230 domain=opts.domain,
231 trig=opts.trig,
232 )
233 y_opt = y_n
234 else:
235 tmp = _GPROptions(
236 sigma=opts.sigma,
237 sigma_given=True,
238 noise=opts.noise,
239 domain=opts.domain,
240 trig=opts.trig,
241 )
242 y_opt = y_arr
244 opts.length_scale = _select_length_scale(x_arr, y_opt, tmp)
247def _posterior_chebfuns(
248 x_arr: np.ndarray,
249 y_arr: np.ndarray,
250 opts: _GPROptions,
251 scaling_factor: float,
252 n_samples: int,
253) -> tuple[Chebfun, Chebfun] | tuple[Chebfun, Chebfun, Quasimatrix]:
254 """Compute posterior mean, variance, and optional samples as Chebfuns."""
255 n = len(x_arr)
256 cov_mat = _kernel_matrix(x_arr, x_arr, opts)
257 if opts.noise == 0:
258 cov_mat += 1e-15 * scaling_factor**2 * n * np.eye(n)
259 else:
260 cov_mat += opts.noise**2 * np.eye(n)
262 chol_l = np.linalg.cholesky(cov_mat)
263 alpha = np.linalg.solve(chol_l.T, np.linalg.solve(chol_l, y_arr))
265 # Shared Chebyshev grid
266 sample_size = min(20 * n, 2000)
267 t = chebpts2(sample_size)
268 x_sample = 0.5 * (opts.domain[1] - opts.domain[0]) * t + 0.5 * (opts.domain[0] + opts.domain[1])
270 in_x = np.isin(x_sample, x_arr)
272 k_star = _kernel_matrix(x_sample, x_arr, opts)
273 if opts.noise:
274 k_star += opts.noise**2 * (np.abs(x_sample[:, None] - x_arr[None, :]) == 0)
276 # Posterior mean
277 mean_vals = k_star @ alpha
278 f_mean = Chebfun.initfun_fixedlen(lambda _z: mean_vals, sample_size, opts.domain)
280 # Posterior variance
281 k_ss = _kernel_matrix(x_sample, x_sample, opts)
282 if opts.noise:
283 k_ss += opts.noise**2 * np.diag(in_x.astype(float))
285 v = np.linalg.solve(chol_l, k_star.T)
286 var_diag = np.diag(k_ss) - np.sum(v**2, axis=0)
287 var_diag = np.maximum(var_diag, 0.0)
288 f_var = Chebfun.initfun_fixedlen(lambda _z: var_diag, sample_size, opts.domain)
290 if n_samples <= 0:
291 return f_mean, f_var
293 # Posterior samples
294 cov_post = k_ss - v.T @ v
295 cov_post = 0.5 * (cov_post + cov_post.T)
296 cov_post += 1e-12 * scaling_factor**2 * n * np.eye(sample_size)
297 chol_s = np.linalg.cholesky(cov_post)
299 draws = mean_vals[:, None] + chol_s @ np.random.randn(sample_size, n_samples)
300 cols: list[Chebfun] = []
301 for j in range(n_samples):
302 cols.append(
303 Chebfun.initfun_fixedlen(
304 lambda _z, _j=j: draws[:, _j],
305 sample_size,
306 opts.domain,
307 )
308 )
309 return f_mean, f_var, Quasimatrix(cols)
312def gpr(
313 x: ArrayLike,
314 y: ArrayLike,
315 *,
316 domain: tuple[float, float] | list[float] | np.ndarray | None = None,
317 sigma: float | None = None,
318 length_scale: float | None = None,
319 noise: float = 0.0,
320 trig: bool = False,
321 n_samples: int = 0,
322) -> tuple[Chebfun, Chebfun] | tuple[Chebfun, Chebfun, Quasimatrix]:
323 """Gaussian process regression returning Chebfun objects.
325 Given observations ``(x, y)`` of a latent function, compute the posterior
326 mean and variance of a Gaussian process with zero prior mean and a squared
327 exponential kernel::
329 k(x, x') = sigma**2 * exp(-0.5 / L**2 * (x - x')**2)
331 When ``trig=True`` a periodic variant is used instead::
333 k(x, x') = sigma**2 * exp(-2 / L**2 * sin(pi * (x - x') / P)**2)
335 where *P* is the period (length of the domain).
337 Args:
338 x: Observation locations (1-D array-like).
339 y: Observation values (same length as *x*).
340 domain: Domain ``[a, b]`` for the output Chebfuns. Defaults to
341 ``[min(x), max(x)]`` (or slightly extended for ``trig``).
342 sigma: Signal variance of the kernel. Defaults to ``max(|y|)``.
343 length_scale: Length-scale *L* of the kernel. If ``None``, it is
344 chosen to maximise the log marginal likelihood.
345 noise: Standard deviation of i.i.d. Gaussian observation noise.
346 The kernel diagonal is augmented by ``noise**2``.
347 trig: If ``True``, use a periodic squared-exponential kernel.
348 n_samples: Number of independent posterior samples to draw. When
349 positive, a :class:`Quasimatrix` with *n_samples* columns is
350 returned as the third element of the output tuple.
352 Returns:
353 ``(f_mean, f_var)`` — posterior mean and variance as Chebfun objects.
354 If ``n_samples > 0``, returns ``(f_mean, f_var, samples)`` where
355 *samples* is a Quasimatrix whose columns are independent draws from
356 the posterior.
358 Raises:
359 ValueError: If *x* and *y* have different lengths or are empty.
361 Examples:
362 >>> import numpy as np
363 >>> from chebpy.gpr import gpr
364 >>> rng = np.random.default_rng(1)
365 >>> x = -2 + 4 * rng.random(10)
366 >>> y = np.sin(np.exp(x))
367 >>> f_mean, f_var = gpr(x, y, domain=[-2, 2])
369 Reference:
370 C. E. Rasmussen & C. K. I. Williams, "Gaussian Processes for Machine
371 Learning", MIT Press, 2006.
372 """
373 x_arr, y_arr, opts, scaling_factor = _parse_inputs(
374 x,
375 y,
376 sigma=sigma,
377 noise=noise,
378 trig=trig,
379 n_samples=n_samples,
380 )
381 _infer_domain(x_arr, opts, domain)
382 _infer_length_scale(x_arr, y_arr, opts, scaling_factor, length_scale)
384 # No data → return prior
385 if len(x_arr) == 0:
386 f_mean = Chebfun.initconst(0.0, opts.domain)
387 f_var = Chebfun.initconst(opts.sigma**2, opts.domain)
388 if n_samples > 0:
389 return f_mean, f_var, _prior_samples(opts, scaling_factor, n_samples)
390 return f_mean, f_var
392 return _posterior_chebfuns(x_arr, y_arr, opts, scaling_factor, n_samples)
395def _prior_samples(
396 opts: _GPROptions,
397 scaling_factor: float,
398 n_samples: int,
399) -> Quasimatrix:
400 """Draw samples from the GP prior (no observations)."""
401 sample_size = 1000
402 if opts.trig:
403 x_sample = np.linspace(opts.domain[0], opts.domain[1], sample_size)
404 else:
405 t = chebpts2(sample_size)
406 x_sample = 0.5 * (opts.domain[1] - opts.domain[0]) * t + 0.5 * (opts.domain[0] + opts.domain[1])
408 k_ss = _kernel_matrix(x_sample, x_sample, opts)
409 k_ss += 1e-12 * scaling_factor**2 * np.eye(sample_size)
410 chol_s = np.linalg.cholesky(k_ss)
412 f_mean_vals = np.zeros(sample_size)
413 draws = f_mean_vals[:, None] + chol_s @ np.random.randn(sample_size, n_samples)
415 cols: list[Chebfun] = []
416 for j in range(n_samples):
417 cols.append(
418 Chebfun.initfun_fixedlen(
419 lambda _z, _j=j: draws[:, _j],
420 sample_size,
421 opts.domain,
422 )
423 )
424 return Quasimatrix(cols)