Coverage for src / chebpy / gpr.py: 100%

172 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-22 21:33 +0000

1"""Gaussian process regression with Chebfun representations. 

2 

3Implements Gaussian process regression (GPR) following the algorithm described 

4in Rasmussen & Williams, *Gaussian Processes for Machine Learning*, MIT Press, 

52006, and the MATLAB Chebfun ``gpr.m`` by The University of Oxford and The 

6Chebfun Developers. 

7 

8The posterior mean, variance, and (optionally) random samples from the 

9posterior are all returned as Chebfun / Quasimatrix objects so that they can 

10be manipulated with the full ChebPy toolkit (differentiation, integration, 

11rootfinding, etc.). 

12 

13Reference: 

14 C. E. Rasmussen & C. K. I. Williams, "Gaussian Processes for Machine 

15 Learning", MIT Press, 2006. 

16""" 

17 

18from __future__ import annotations 

19 

20from collections.abc import Callable 

21from dataclasses import dataclass, field 

22 

23import numpy as np 

24from numpy.typing import ArrayLike 

25 

26from .algorithms import chebpts2 

27from .chebfun import Chebfun 

28from .quasimatrix import Quasimatrix 

29 

30 

31# --------------------------------------------------------------------------- 

32# Options container 

33# --------------------------------------------------------------------------- 

34@dataclass 

35class _GPROptions: 

36 """Parsed options for a GPR call.""" 

37 

38 sigma: float = 1.0 

39 sigma_given: bool = False 

40 length_scale: float = 0.0 

41 noise: float = 0.0 

42 domain: np.ndarray = field(default_factory=lambda: np.array([-1.0, 1.0])) 

43 trig: bool = False 

44 n_samples: int = 0 

45 

46 

47# --------------------------------------------------------------------------- 

48# Kernel helpers 

49# --------------------------------------------------------------------------- 

50 

51 

52def _kernel_matrix( 

53 x1: np.ndarray, 

54 x2: np.ndarray, 

55 opts: _GPROptions, 

56) -> np.ndarray: 

57 """Evaluate the covariance kernel k(x1_i, x2_j) for all pairs.""" 

58 r = x1[:, None] - x2[None, :] 

59 if opts.trig: 

60 period = opts.domain[1] - opts.domain[0] 

61 return opts.sigma**2 * np.exp(-2.0 / opts.length_scale**2 * np.sin(np.pi / period * r) ** 2) 

62 return opts.sigma**2 * np.exp(-0.5 / opts.length_scale**2 * r**2) 

63 

64 

65def _log_marginal_likelihood( 

66 length_scale: float | np.ndarray, 

67 x: np.ndarray, 

68 y: np.ndarray, 

69 opts: _GPROptions, 

70) -> float | np.ndarray: 

71 """Negative log marginal likelihood (eq. 2.30 in Rasmussen & Williams). 

72 

73 Accepts scalar or array *length_scale* so that it can be wrapped as a 

74 Chebfun for optimisation. 

75 """ 

76 scalar_input = np.ndim(length_scale) == 0 

77 ls = np.atleast_1d(np.asarray(length_scale, dtype=float)) 

78 n = len(x) 

79 rx = x[:, None] - x[None, :] 

80 result = np.empty_like(ls) 

81 

82 for idx in np.ndindex(ls.shape): 

83 l_val = ls[idx] 

84 if opts.trig: 

85 period = opts.domain[1] - opts.domain[0] 

86 cov_mat = opts.sigma**2 * np.exp(-2.0 / l_val**2 * np.sin(np.pi / period * rx) ** 2) 

87 else: 

88 cov_mat = opts.sigma**2 * np.exp(-0.5 / l_val**2 * rx**2) 

89 

90 if opts.noise != 0: 

91 cov_mat += opts.noise**2 * np.eye(n) 

92 else: 

93 cov_mat += 1e-15 * n * opts.sigma**2 * np.eye(n) 

94 

95 chol_l = np.linalg.cholesky(cov_mat) 

96 alpha = np.linalg.solve(chol_l.T, np.linalg.solve(chol_l, y)) 

97 lml = -0.5 * y @ alpha - np.sum(np.log(np.diag(chol_l))) - 0.5 * n * np.log(2 * np.pi) 

98 result[idx] = lml 

99 

100 return float(result.item()) if scalar_input else result.ravel() 

101 

102 

103# --------------------------------------------------------------------------- 

104# Length-scale selection via max log marginal likelihood 

105# --------------------------------------------------------------------------- 

106 

107 

108def _select_length_scale(x: np.ndarray, y: np.ndarray, opts: _GPROptions) -> float: 

109 """Choose the length-scale that maximises the log marginal likelihood.""" 

110 n = len(x) 

111 dom_size = opts.domain[1] - opts.domain[0] 

112 

113 if opts.trig: 

114 lo, hi = 1.0 / (2 * n), 10.0 

115 else: 

116 lo, hi = dom_size / (2 * np.pi * n), 10.0 / np.pi * dom_size 

117 

118 # Heuristic: shrink the right end of the search domain if the lml is 

119 # monotonically decreasing (mirrors the MATLAB implementation). 

120 f1 = float(_log_marginal_likelihood(lo, x, y, opts)) 

121 f2 = float(_log_marginal_likelihood(hi, x, y, opts)) 

122 while f1 > f2 and hi / lo > 1 + 1e-4: 

123 new_bound = lo + (hi - lo) / 10.0 

124 f_new = float(_log_marginal_likelihood(new_bound, x, y, opts)) 

125 if f_new > f1: 

126 break 

127 hi = new_bound 

128 f2 = f_new 

129 

130 # Maximise using golden-section search (negated to find the max). 

131 return _golden_section_max(lambda ls: float(_log_marginal_likelihood(ls, x, y, opts)), lo, hi) 

132 

133 

134def _golden_section_max(f: Callable[[float], float], a: float, b: float, tol: float = 1e-6) -> float: 

135 """Golden-section search for the scalar argmax of *f* on [a, b].""" 

136 gr = (np.sqrt(5.0) + 1.0) / 2.0 

137 c = b - (b - a) / gr 

138 d = a + (b - a) / gr 

139 while abs(b - a) > tol * (abs(a) + abs(b)): 

140 if f(c) > f(d): 

141 b = d 

142 else: 

143 a = c 

144 c = b - (b - a) / gr 

145 d = a + (b - a) / gr 

146 return 0.5 * (a + b) 

147 

148 

149# --------------------------------------------------------------------------- 

150# Public API 

151# --------------------------------------------------------------------------- 

152 

153 

154def _parse_inputs( 

155 x: ArrayLike, 

156 y: ArrayLike, 

157 *, 

158 sigma: float | None, 

159 noise: float, 

160 trig: bool, 

161 n_samples: int, 

162) -> tuple[np.ndarray, np.ndarray, _GPROptions, float]: 

163 """Validate inputs and build the initial options container. 

164 

165 Returns ``(x_arr, y_arr, opts, scaling_factor)``. 

166 """ 

167 x_arr = np.asarray(x, dtype=float).ravel() 

168 y_arr = np.asarray(y, dtype=float).ravel() 

169 if x_arr.shape != y_arr.shape: 

170 msg = "x and y must have the same length." 

171 raise ValueError(msg) 

172 

173 opts = _GPROptions(trig=trig, noise=noise, n_samples=n_samples) 

174 

175 scaling_factor = 1.0 

176 if sigma is not None: 

177 opts.sigma = sigma 

178 opts.sigma_given = True 

179 else: 

180 if len(y_arr) > 0: 

181 scaling_factor = float(np.max(np.abs(y_arr))) 

182 opts.sigma_given = False 

183 opts.sigma = scaling_factor 

184 

185 return x_arr, y_arr, opts, scaling_factor 

186 

187 

188def _infer_domain( 

189 x_arr: np.ndarray, 

190 opts: _GPROptions, 

191 domain: tuple[float, float] | list[float] | np.ndarray | None, 

192) -> None: 

193 """Set ``opts.domain`` from *domain* or from the observation locations.""" 

194 if domain is not None: 

195 opts.domain = np.asarray(domain, dtype=float) 

196 elif len(x_arr) == 0: 

197 opts.domain = np.array([-1.0, 1.0]) 

198 elif len(x_arr) == 1: 

199 opts.domain = np.array([x_arr[0] - 1, x_arr[0] + 1]) 

200 elif opts.trig: 

201 span = float(np.max(x_arr) - np.min(x_arr)) 

202 opts.domain = np.array([float(np.min(x_arr)), float(np.max(x_arr)) + 0.1 * span]) 

203 else: 

204 opts.domain = np.array([float(np.min(x_arr)), float(np.max(x_arr))]) 

205 

206 

207def _infer_length_scale( 

208 x_arr: np.ndarray, 

209 y_arr: np.ndarray, 

210 opts: _GPROptions, 

211 scaling_factor: float, 

212 length_scale: float | None, 

213) -> None: 

214 """Set ``opts.length_scale`` — user-supplied or auto-selected.""" 

215 if length_scale is not None: 

216 opts.length_scale = length_scale 

217 return 

218 

219 if len(x_arr) == 0: 

220 opts.length_scale = 1.0 

221 return 

222 

223 y_n = y_arr / scaling_factor if scaling_factor != 0 else y_arr 

224 

225 if not opts.sigma_given: 

226 tmp = _GPROptions( 

227 sigma=1.0, 

228 sigma_given=True, 

229 noise=opts.noise / scaling_factor if scaling_factor != 0 else opts.noise, 

230 domain=opts.domain, 

231 trig=opts.trig, 

232 ) 

233 y_opt = y_n 

234 else: 

235 tmp = _GPROptions( 

236 sigma=opts.sigma, 

237 sigma_given=True, 

238 noise=opts.noise, 

239 domain=opts.domain, 

240 trig=opts.trig, 

241 ) 

242 y_opt = y_arr 

243 

244 opts.length_scale = _select_length_scale(x_arr, y_opt, tmp) 

245 

246 

247def _posterior_chebfuns( 

248 x_arr: np.ndarray, 

249 y_arr: np.ndarray, 

250 opts: _GPROptions, 

251 scaling_factor: float, 

252 n_samples: int, 

253) -> tuple[Chebfun, Chebfun] | tuple[Chebfun, Chebfun, Quasimatrix]: 

254 """Compute posterior mean, variance, and optional samples as Chebfuns.""" 

255 n = len(x_arr) 

256 cov_mat = _kernel_matrix(x_arr, x_arr, opts) 

257 if opts.noise == 0: 

258 cov_mat += 1e-15 * scaling_factor**2 * n * np.eye(n) 

259 else: 

260 cov_mat += opts.noise**2 * np.eye(n) 

261 

262 chol_l = np.linalg.cholesky(cov_mat) 

263 alpha = np.linalg.solve(chol_l.T, np.linalg.solve(chol_l, y_arr)) 

264 

265 # Shared Chebyshev grid 

266 sample_size = min(20 * n, 2000) 

267 t = chebpts2(sample_size) 

268 x_sample = 0.5 * (opts.domain[1] - opts.domain[0]) * t + 0.5 * (opts.domain[0] + opts.domain[1]) 

269 

270 in_x = np.isin(x_sample, x_arr) 

271 

272 k_star = _kernel_matrix(x_sample, x_arr, opts) 

273 if opts.noise: 

274 k_star += opts.noise**2 * (np.abs(x_sample[:, None] - x_arr[None, :]) == 0) 

275 

276 # Posterior mean 

277 mean_vals = k_star @ alpha 

278 f_mean = Chebfun.initfun_fixedlen(lambda _z: mean_vals, sample_size, opts.domain) 

279 

280 # Posterior variance 

281 k_ss = _kernel_matrix(x_sample, x_sample, opts) 

282 if opts.noise: 

283 k_ss += opts.noise**2 * np.diag(in_x.astype(float)) 

284 

285 v = np.linalg.solve(chol_l, k_star.T) 

286 var_diag = np.diag(k_ss) - np.sum(v**2, axis=0) 

287 var_diag = np.maximum(var_diag, 0.0) 

288 f_var = Chebfun.initfun_fixedlen(lambda _z: var_diag, sample_size, opts.domain) 

289 

290 if n_samples <= 0: 

291 return f_mean, f_var 

292 

293 # Posterior samples 

294 cov_post = k_ss - v.T @ v 

295 cov_post = 0.5 * (cov_post + cov_post.T) 

296 cov_post += 1e-12 * scaling_factor**2 * n * np.eye(sample_size) 

297 chol_s = np.linalg.cholesky(cov_post) 

298 

299 draws = mean_vals[:, None] + chol_s @ np.random.randn(sample_size, n_samples) 

300 cols: list[Chebfun] = [] 

301 for j in range(n_samples): 

302 cols.append( 

303 Chebfun.initfun_fixedlen( 

304 lambda _z, _j=j: draws[:, _j], 

305 sample_size, 

306 opts.domain, 

307 ) 

308 ) 

309 return f_mean, f_var, Quasimatrix(cols) 

310 

311 

312def gpr( 

313 x: ArrayLike, 

314 y: ArrayLike, 

315 *, 

316 domain: tuple[float, float] | list[float] | np.ndarray | None = None, 

317 sigma: float | None = None, 

318 length_scale: float | None = None, 

319 noise: float = 0.0, 

320 trig: bool = False, 

321 n_samples: int = 0, 

322) -> tuple[Chebfun, Chebfun] | tuple[Chebfun, Chebfun, Quasimatrix]: 

323 """Gaussian process regression returning Chebfun objects. 

324 

325 Given observations ``(x, y)`` of a latent function, compute the posterior 

326 mean and variance of a Gaussian process with zero prior mean and a squared 

327 exponential kernel:: 

328 

329 k(x, x') = sigma**2 * exp(-0.5 / L**2 * (x - x')**2) 

330 

331 When ``trig=True`` a periodic variant is used instead:: 

332 

333 k(x, x') = sigma**2 * exp(-2 / L**2 * sin(pi * (x - x') / P)**2) 

334 

335 where *P* is the period (length of the domain). 

336 

337 Args: 

338 x: Observation locations (1-D array-like). 

339 y: Observation values (same length as *x*). 

340 domain: Domain ``[a, b]`` for the output Chebfuns. Defaults to 

341 ``[min(x), max(x)]`` (or slightly extended for ``trig``). 

342 sigma: Signal variance of the kernel. Defaults to ``max(|y|)``. 

343 length_scale: Length-scale *L* of the kernel. If ``None``, it is 

344 chosen to maximise the log marginal likelihood. 

345 noise: Standard deviation of i.i.d. Gaussian observation noise. 

346 The kernel diagonal is augmented by ``noise**2``. 

347 trig: If ``True``, use a periodic squared-exponential kernel. 

348 n_samples: Number of independent posterior samples to draw. When 

349 positive, a :class:`Quasimatrix` with *n_samples* columns is 

350 returned as the third element of the output tuple. 

351 

352 Returns: 

353 ``(f_mean, f_var)`` — posterior mean and variance as Chebfun objects. 

354 If ``n_samples > 0``, returns ``(f_mean, f_var, samples)`` where 

355 *samples* is a Quasimatrix whose columns are independent draws from 

356 the posterior. 

357 

358 Raises: 

359 ValueError: If *x* and *y* have different lengths or are empty. 

360 

361 Examples: 

362 >>> import numpy as np 

363 >>> from chebpy.gpr import gpr 

364 >>> rng = np.random.default_rng(1) 

365 >>> x = -2 + 4 * rng.random(10) 

366 >>> y = np.sin(np.exp(x)) 

367 >>> f_mean, f_var = gpr(x, y, domain=[-2, 2]) 

368 

369 Reference: 

370 C. E. Rasmussen & C. K. I. Williams, "Gaussian Processes for Machine 

371 Learning", MIT Press, 2006. 

372 """ 

373 x_arr, y_arr, opts, scaling_factor = _parse_inputs( 

374 x, 

375 y, 

376 sigma=sigma, 

377 noise=noise, 

378 trig=trig, 

379 n_samples=n_samples, 

380 ) 

381 _infer_domain(x_arr, opts, domain) 

382 _infer_length_scale(x_arr, y_arr, opts, scaling_factor, length_scale) 

383 

384 # No data → return prior 

385 if len(x_arr) == 0: 

386 f_mean = Chebfun.initconst(0.0, opts.domain) 

387 f_var = Chebfun.initconst(opts.sigma**2, opts.domain) 

388 if n_samples > 0: 

389 return f_mean, f_var, _prior_samples(opts, scaling_factor, n_samples) 

390 return f_mean, f_var 

391 

392 return _posterior_chebfuns(x_arr, y_arr, opts, scaling_factor, n_samples) 

393 

394 

395def _prior_samples( 

396 opts: _GPROptions, 

397 scaling_factor: float, 

398 n_samples: int, 

399) -> Quasimatrix: 

400 """Draw samples from the GP prior (no observations).""" 

401 sample_size = 1000 

402 if opts.trig: 

403 x_sample = np.linspace(opts.domain[0], opts.domain[1], sample_size) 

404 else: 

405 t = chebpts2(sample_size) 

406 x_sample = 0.5 * (opts.domain[1] - opts.domain[0]) * t + 0.5 * (opts.domain[0] + opts.domain[1]) 

407 

408 k_ss = _kernel_matrix(x_sample, x_sample, opts) 

409 k_ss += 1e-12 * scaling_factor**2 * np.eye(sample_size) 

410 chol_s = np.linalg.cholesky(k_ss) 

411 

412 f_mean_vals = np.zeros(sample_size) 

413 draws = f_mean_vals[:, None] + chol_s @ np.random.randn(sample_size, n_samples) 

414 

415 cols: list[Chebfun] = [] 

416 for j in range(n_samples): 

417 cols.append( 

418 Chebfun.initfun_fixedlen( 

419 lambda _z, _j=j: draws[:, _j], 

420 sample_size, 

421 opts.domain, 

422 ) 

423 ) 

424 return Quasimatrix(cols)