Source code for pinnx.fnspace

# Rewrite of the original file in DeepXDE: https://github.com/lululxvi/deepxde
# ==============================================================================


__all__ = [
    "FunctionSpace",
    "PowerSeries",
    "Chebyshev",
    "GRF",
    "GRF_KL",
    "GRF2D",
    "wasserstein2",
]

import abc
import importlib.util

import brainstate
import numpy as np
from scipy import linalg, interpolate

from pinnx.utils import isclose

sklearn_installed = importlib.util.find_spec("sklearn")
if sklearn_installed:
    from sklearn import gaussian_process as gp


[docs] class FunctionSpace(abc.ABC): """ Function space base class. Example: .. code-block:: python space = pinnx.fnspace.GRF() feats = space.random(10) xs = np.linspace(0, 1, num=100)[:, None] y = space.eval_batch(feats, xs) """
[docs] @abc.abstractmethod def random(self, size): """Generate feature vectors of random functions. Args: size (int): The number of random functions to generate. Returns: A NumPy array of shape (`size`, n_features). """
[docs] @abc.abstractmethod def eval_one(self, feature, x): """Evaluate the function at one point. Args: feature: The feature vector of the function to be evaluated. x: The point to be evaluated. Returns: float: The function value at `x`. """
[docs] @abc.abstractmethod def eval_batch(self, features, xs): """Evaluate a list of functions at a list of points. Args: features: A NumPy array of shape (n_functions, n_features). A list of the feature vectors of the functions to be evaluated. xs: A NumPy array of shape (n_points, dim). A list of points to be evaluated. Returns: A NumPy array of shape (n_functions, n_points). The values of different functions at different points. """
[docs] class PowerSeries(FunctionSpace): r"""Power series. p(x) = \sum_{i=0}^{N-1} a_i x^i Args: N (int): The number of terms in the power series. M (float): `M` > 0. The coefficients a_i are randomly sampled from [-`M`, `M`]. """ def __init__(self, N: int = 100, M: float = 1): self.N = N self.M = M
[docs] def random(self, size): return 2 * self.M * np.random.rand(size, self.N).astype(brainstate.environ.dftype()) - self.M
[docs] def eval_one(self, feature, x): return np.dot(feature, x ** np.arange(self.N))
[docs] def eval_batch(self, features, xs): mat = np.ones((self.N, len(xs)), dtype=brainstate.environ.dftype()) for i in range(1, self.N): mat[i] = np.ravel(xs ** i) return np.dot(features, mat)
[docs] class Chebyshev(FunctionSpace): r""" Chebyshev polynomial. p(x) = \sum_{i=0}^{N-1} a_i T_i(x), where T_i is Chebyshev polynomial of the first kind. Note: The domain of x is scaled from [-1, 1] to [0, 1]. Args: N (int): The number of terms in the Chebyshev polynomial. M (float): `M` > 0. The coefficients a_i are randomly sampled from [-`M`, `M`]. """ def __init__(self, N: int = 100, M: float = 1): self.N = N self.M = M
[docs] def random(self, size): return 2 * self.M * np.random.rand(size, self.N) - self.M
[docs] def eval_one(self, feature, x): return np.polynomial.chebyshev.chebval(2 * x - 1, feature)
[docs] def eval_batch(self, features, xs): return np.polynomial.chebyshev.chebval(2 * np.ravel(xs) - 1, features.T).astype(brainstate.environ.dftype())
[docs] class GRF(FunctionSpace): """ Gaussian random field (Gaussian process) in 1D. The random sampling algorithm is based on Cholesky decomposition of the covariance matrix. Args: T (float): `T` > 0. The domain is [0, `T`]. kernel (str): Name of the kernel function. "RBF" (radial-basis function kernel, squared-exponential kernel, Gaussian kernel), "AE" (absolute exponential kernel), or "ExpSineSquared" (Exp-Sine-Squared kernel, periodic kernel). length_scale (float): The length scale of the kernel. N (int): The size of the covariance matrix. interp (str): The interpolation to interpolate the random function. "linear", "quadratic", or "cubic". """ def __init__( self, T: float = 1, kernel: str = "RBF", length_scale: float = 1, N: int = 1000, interp: str = "cubic" ): if not sklearn_installed: raise ImportError("scikit-learn is required to use GRF. " "Please install it via 'pip install scikit-learn'.") self.N = N self.interp = interp self.x = np.linspace(0, T, num=N)[:, None] if kernel == "RBF": K = gp.kernels.RBF(length_scale=length_scale) elif kernel == "AE": K = gp.kernels.Matern(length_scale=length_scale, nu=0.5) elif kernel == "ExpSineSquared": K = gp.kernels.ExpSineSquared(length_scale=length_scale, periodicity=T) self.K = K(self.x) self.L = np.linalg.cholesky(self.K + 1e-13 * np.eye(self.N))
[docs] def random(self, size): u = np.random.randn(self.N, size) return np.dot(self.L, u).T
[docs] def eval_one(self, feature, x): if self.interp == "linear": return np.interp(x, np.ravel(self.x), feature) f = interpolate.interp1d(np.ravel(self.x), feature, kind=self.interp, copy=False, assume_sorted=True) return f(x)
[docs] def eval_batch(self, features, xs): if self.interp == "linear": return np.vstack([np.interp(xs, np.ravel(self.x), y).T for y in features]) res = map( lambda y: interpolate.interp1d(np.ravel(self.x), y, kind=self.interp, copy=False, assume_sorted=True)(xs).T, features, ) return np.vstack(list(res)).astype(brainstate.environ.dftype())
[docs] class GRF_KL(FunctionSpace): """Gaussian random field (Gaussian process) in 1D. The random sampling algorithm is based on truncated Karhunen-Loeve (KL) expansion. Args: T (float): `T` > 0. The domain is [0, `T`]. kernel (str): The kernel function. "RBF" (radial-basis function) or "AE" (absolute exponential). length_scale (float): The length scale of the kernel. num_eig (int): The number of eigenfunctions in KL expansion to be kept. N (int): Each eigenfunction is discretized at `N` points in [0, `T`]. interp (str): The interpolation to interpolate the random function. "linear", "quadratic", or "cubic". """ def __init__( self, T: float = 1, kernel: str = "RBF", length_scale: float = 1, num_eig: int = 10, N: int = 100, interp: str = "cubic" ): if not sklearn_installed: raise ImportError("scikit-learn is required to use GRF_KL. " "Please install it via 'pip install scikit-learn'.") if not isclose(T, 1): raise ValueError("GRF_KL only supports T = 1.") self.num_eig = num_eig if kernel == "RBF": kernel = gp.kernels.RBF(length_scale=length_scale) elif kernel == "AE": kernel = gp.kernels.Matern(length_scale=length_scale, nu=0.5) eigval, eigvec = eig(kernel, num_eig, N, eigenfunction=True) eigvec *= eigval ** 0.5 x = np.linspace(0, T, num=N) self.eigfun = [ interpolate.interp1d(x, y, kind=interp, copy=False, assume_sorted=True) for y in eigvec.T ]
[docs] def bases(self, sensors): """Evaluate the eigenfunctions at a list of points `sensors`.""" return np.array([np.ravel(f(sensors)) for f in self.eigfun])
[docs] def random(self, size): return np.random.randn(size, self.num_eig).astype(brainstate.environ.dftype())
[docs] def eval_one(self, feature, x): eigfun = [f(x) for f in self.eigfun] return np.sum(eigfun * feature)
[docs] def eval_batch(self, features, xs): eigfun = np.array([np.ravel(f(xs)) for f in self.eigfun], dtype=brainstate.environ.dftype()) return np.dot(features, eigfun)
[docs] class GRF2D(FunctionSpace): """Gaussian random field in [0, 1]x[0, 1]. The random sampling algorithm is based on Cholesky decomposition of the covariance matrix. Args: kernel (str): The kernel function. "RBF" (radial-basis function) or "AE" (absolute exponential). length_scale (float): The length scale of the kernel. N (int): The size of the covariance matrix. interp (str): The interpolation to interpolate the random function. "linear" or "splinef2d". Example: .. code-block:: python space = pinnx.fnspace.GRF2D(length_scale=0.1) features = space.random(3) x = np.linspace(0, 1, num=500) y = np.linspace(0, 1, num=500) xv, yv = np.meshgrid(x, y) sensors = np.vstack((np.ravel(xv), np.ravel(yv))).T u = space.eval_batch(features, sensors) for ui in u: plt.figure() plt.imshow(np.reshape(ui, (len(y), len(x)))) plt.colorbar() plt.show() """ def __init__( self, kernel: str = "RBF", length_scale: float = 1, N: int = 100, interp: str = "splinef2d" ): if not sklearn_installed: raise ImportError("scikit-learn is required to use GRF2D. " "Please install it via 'pip install scikit-learn'.") self.N = N self.interp = interp self.x = np.linspace(0, 1, num=N) self.y = np.linspace(0, 1, num=N) xv, yv = np.meshgrid(self.x, self.y) self.X = np.vstack((np.ravel(xv), np.ravel(yv))).T if kernel == "RBF": K = gp.kernels.RBF(length_scale=length_scale) elif kernel == "AE": K = gp.kernels.Matern(length_scale=length_scale, nu=0.5) self.K = K(self.X) self.L = np.linalg.cholesky(self.K + 1e-12 * np.eye(self.N ** 2))
[docs] def random(self, size): u = np.random.randn(self.N ** 2, size) return np.dot(self.L, u).T
[docs] def eval_one(self, feature, x): y = np.reshape(feature, (self.N, self.N)) return interpolate.interpn((self.x, self.y), y, x, method=self.interp)[0]
[docs] def eval_batch(self, features, xs): points = (self.x, self.y) ys = np.reshape(features, (-1, self.N, self.N)) res = map(lambda y: interpolate.interpn(points, y, xs, method=self.interp), ys) return np.vstack(list(res)).astype(brainstate.environ.dftype())
[docs] def wasserstein2(space1, space2): """Compute 2-Wasserstein (W2) metric to measure the distance between two ``GRF``.""" return ( np.trace(space1.K + space2.K - 2 * linalg.sqrtm(space1.K @ space2.K)) ** 0.5 / space1.N ** 0.5 )
def eig(kernel, num, Nx, eigenfunction=True): """Compute the eigenvalues and eigenfunctions of a kernel function in [0, 1].""" h = 1 / (Nx - 1) c = kernel(np.linspace(0, 1, num=Nx)[:, None])[0] * h A = np.empty((Nx, Nx)) for i in range(Nx): A[i, i:] = c[: Nx - i] A[i, i::-1] = c[: i + 1] A[:, 0] *= 0.5 A[:, -1] *= 0.5 if not eigenfunction: return np.flipud(np.sort(np.real(np.linalg.eigvals(A))))[:num] eigval, eigvec = np.linalg.eig(A) eigval, eigvec = np.real(eigval), np.real(eigvec) idx = np.flipud(np.argsort(eigval))[:num] eigval, eigvec = eigval[idx], eigvec[:, idx] for i in range(num): eigvec[:, i] /= np.trapz(eigvec[:, i] ** 2, dx=h) ** 0.5 return eigval, eigvec