# Rewrite of the original file in DeepXDE: https://github.com/lululxvi/deepxde
# ==============================================================================
__all__ = [
"FunctionSpace",
"PowerSeries",
"Chebyshev",
"GRF",
"GRF_KL",
"GRF2D",
"wasserstein2",
]
import abc
import importlib.util
import brainstate
import numpy as np
from scipy import linalg, interpolate
from pinnx.utils import isclose
sklearn_installed = importlib.util.find_spec("sklearn")
if sklearn_installed:
from sklearn import gaussian_process as gp
[docs]
class FunctionSpace(abc.ABC):
"""
Function space base class.
Example:
.. code-block:: python
space = pinnx.fnspace.GRF()
feats = space.random(10)
xs = np.linspace(0, 1, num=100)[:, None]
y = space.eval_batch(feats, xs)
"""
[docs]
@abc.abstractmethod
def random(self, size):
"""Generate feature vectors of random functions.
Args:
size (int): The number of random functions to generate.
Returns:
A NumPy array of shape (`size`, n_features).
"""
[docs]
@abc.abstractmethod
def eval_one(self, feature, x):
"""Evaluate the function at one point.
Args:
feature: The feature vector of the function to be evaluated.
x: The point to be evaluated.
Returns:
float: The function value at `x`.
"""
[docs]
@abc.abstractmethod
def eval_batch(self, features, xs):
"""Evaluate a list of functions at a list of points.
Args:
features: A NumPy array of shape (n_functions, n_features). A list of the
feature vectors of the functions to be evaluated.
xs: A NumPy array of shape (n_points, dim). A list of points to be
evaluated.
Returns:
A NumPy array of shape (n_functions, n_points). The values of
different functions at different points.
"""
[docs]
class PowerSeries(FunctionSpace):
r"""Power series.
p(x) = \sum_{i=0}^{N-1} a_i x^i
Args:
N (int): The number of terms in the power series.
M (float): `M` > 0. The coefficients a_i are randomly sampled from [-`M`, `M`].
"""
def __init__(self, N: int = 100, M: float = 1):
self.N = N
self.M = M
[docs]
def random(self, size):
return 2 * self.M * np.random.rand(size, self.N).astype(brainstate.environ.dftype()) - self.M
[docs]
def eval_one(self, feature, x):
return np.dot(feature, x ** np.arange(self.N))
[docs]
def eval_batch(self, features, xs):
mat = np.ones((self.N, len(xs)), dtype=brainstate.environ.dftype())
for i in range(1, self.N):
mat[i] = np.ravel(xs ** i)
return np.dot(features, mat)
[docs]
class Chebyshev(FunctionSpace):
r"""
Chebyshev polynomial.
p(x) = \sum_{i=0}^{N-1} a_i T_i(x),
where T_i is Chebyshev polynomial of the first kind.
Note: The domain of x is scaled from [-1, 1] to [0, 1].
Args:
N (int): The number of terms in the Chebyshev polynomial.
M (float): `M` > 0. The coefficients a_i are randomly sampled from [-`M`, `M`].
"""
def __init__(self, N: int = 100, M: float = 1):
self.N = N
self.M = M
[docs]
def random(self, size):
return 2 * self.M * np.random.rand(size, self.N) - self.M
[docs]
def eval_one(self, feature, x):
return np.polynomial.chebyshev.chebval(2 * x - 1, feature)
[docs]
def eval_batch(self, features, xs):
return np.polynomial.chebyshev.chebval(2 * np.ravel(xs) - 1, features.T).astype(brainstate.environ.dftype())
[docs]
class GRF(FunctionSpace):
"""
Gaussian random field (Gaussian process) in 1D.
The random sampling algorithm is based on Cholesky decomposition of the covariance
matrix.
Args:
T (float): `T` > 0. The domain is [0, `T`].
kernel (str): Name of the kernel function. "RBF" (radial-basis function kernel,
squared-exponential kernel, Gaussian kernel), "AE"
(absolute exponential kernel), or "ExpSineSquared" (Exp-Sine-Squared kernel,
periodic kernel).
length_scale (float): The length scale of the kernel.
N (int): The size of the covariance matrix.
interp (str): The interpolation to interpolate the random function. "linear",
"quadratic", or "cubic".
"""
def __init__(
self,
T: float = 1,
kernel: str = "RBF",
length_scale: float = 1,
N: int = 1000,
interp: str = "cubic"
):
if not sklearn_installed:
raise ImportError("scikit-learn is required to use GRF. "
"Please install it via 'pip install scikit-learn'.")
self.N = N
self.interp = interp
self.x = np.linspace(0, T, num=N)[:, None]
if kernel == "RBF":
K = gp.kernels.RBF(length_scale=length_scale)
elif kernel == "AE":
K = gp.kernels.Matern(length_scale=length_scale, nu=0.5)
elif kernel == "ExpSineSquared":
K = gp.kernels.ExpSineSquared(length_scale=length_scale, periodicity=T)
self.K = K(self.x)
self.L = np.linalg.cholesky(self.K + 1e-13 * np.eye(self.N))
[docs]
def random(self, size):
u = np.random.randn(self.N, size)
return np.dot(self.L, u).T
[docs]
def eval_one(self, feature, x):
if self.interp == "linear":
return np.interp(x, np.ravel(self.x), feature)
f = interpolate.interp1d(np.ravel(self.x),
feature,
kind=self.interp,
copy=False,
assume_sorted=True)
return f(x)
[docs]
def eval_batch(self, features, xs):
if self.interp == "linear":
return np.vstack([np.interp(xs, np.ravel(self.x), y).T for y in features])
res = map(
lambda y: interpolate.interp1d(np.ravel(self.x),
y,
kind=self.interp,
copy=False,
assume_sorted=True)(xs).T,
features,
)
return np.vstack(list(res)).astype(brainstate.environ.dftype())
[docs]
class GRF_KL(FunctionSpace):
"""Gaussian random field (Gaussian process) in 1D.
The random sampling algorithm is based on truncated Karhunen-Loeve (KL) expansion.
Args:
T (float): `T` > 0. The domain is [0, `T`].
kernel (str): The kernel function. "RBF" (radial-basis function) or "AE"
(absolute exponential).
length_scale (float): The length scale of the kernel.
num_eig (int): The number of eigenfunctions in KL expansion to be kept.
N (int): Each eigenfunction is discretized at `N` points in [0, `T`].
interp (str): The interpolation to interpolate the random function. "linear",
"quadratic", or "cubic".
"""
def __init__(
self,
T: float = 1,
kernel: str = "RBF",
length_scale: float = 1,
num_eig: int = 10,
N: int = 100,
interp: str = "cubic"
):
if not sklearn_installed:
raise ImportError("scikit-learn is required to use GRF_KL. "
"Please install it via 'pip install scikit-learn'.")
if not isclose(T, 1):
raise ValueError("GRF_KL only supports T = 1.")
self.num_eig = num_eig
if kernel == "RBF":
kernel = gp.kernels.RBF(length_scale=length_scale)
elif kernel == "AE":
kernel = gp.kernels.Matern(length_scale=length_scale, nu=0.5)
eigval, eigvec = eig(kernel, num_eig, N, eigenfunction=True)
eigvec *= eigval ** 0.5
x = np.linspace(0, T, num=N)
self.eigfun = [
interpolate.interp1d(x, y, kind=interp, copy=False, assume_sorted=True)
for y in eigvec.T
]
[docs]
def bases(self, sensors):
"""Evaluate the eigenfunctions at a list of points `sensors`."""
return np.array([np.ravel(f(sensors)) for f in self.eigfun])
[docs]
def random(self, size):
return np.random.randn(size, self.num_eig).astype(brainstate.environ.dftype())
[docs]
def eval_one(self, feature, x):
eigfun = [f(x) for f in self.eigfun]
return np.sum(eigfun * feature)
[docs]
def eval_batch(self, features, xs):
eigfun = np.array([np.ravel(f(xs)) for f in self.eigfun],
dtype=brainstate.environ.dftype())
return np.dot(features, eigfun)
[docs]
class GRF2D(FunctionSpace):
"""Gaussian random field in [0, 1]x[0, 1].
The random sampling algorithm is based on Cholesky decomposition of the covariance
matrix.
Args:
kernel (str): The kernel function. "RBF" (radial-basis function) or "AE"
(absolute exponential).
length_scale (float): The length scale of the kernel.
N (int): The size of the covariance matrix.
interp (str): The interpolation to interpolate the random function. "linear" or
"splinef2d".
Example:
.. code-block:: python
space = pinnx.fnspace.GRF2D(length_scale=0.1)
features = space.random(3)
x = np.linspace(0, 1, num=500)
y = np.linspace(0, 1, num=500)
xv, yv = np.meshgrid(x, y)
sensors = np.vstack((np.ravel(xv), np.ravel(yv))).T
u = space.eval_batch(features, sensors)
for ui in u:
plt.figure()
plt.imshow(np.reshape(ui, (len(y), len(x))))
plt.colorbar()
plt.show()
"""
def __init__(
self,
kernel: str = "RBF",
length_scale: float = 1,
N: int = 100,
interp: str = "splinef2d"
):
if not sklearn_installed:
raise ImportError("scikit-learn is required to use GRF2D. "
"Please install it via 'pip install scikit-learn'.")
self.N = N
self.interp = interp
self.x = np.linspace(0, 1, num=N)
self.y = np.linspace(0, 1, num=N)
xv, yv = np.meshgrid(self.x, self.y)
self.X = np.vstack((np.ravel(xv), np.ravel(yv))).T
if kernel == "RBF":
K = gp.kernels.RBF(length_scale=length_scale)
elif kernel == "AE":
K = gp.kernels.Matern(length_scale=length_scale, nu=0.5)
self.K = K(self.X)
self.L = np.linalg.cholesky(self.K + 1e-12 * np.eye(self.N ** 2))
[docs]
def random(self, size):
u = np.random.randn(self.N ** 2, size)
return np.dot(self.L, u).T
[docs]
def eval_one(self, feature, x):
y = np.reshape(feature, (self.N, self.N))
return interpolate.interpn((self.x, self.y), y, x, method=self.interp)[0]
[docs]
def eval_batch(self, features, xs):
points = (self.x, self.y)
ys = np.reshape(features, (-1, self.N, self.N))
res = map(lambda y: interpolate.interpn(points, y, xs, method=self.interp), ys)
return np.vstack(list(res)).astype(brainstate.environ.dftype())
[docs]
def wasserstein2(space1, space2):
"""Compute 2-Wasserstein (W2) metric to measure the distance between two ``GRF``."""
return (
np.trace(space1.K + space2.K - 2 * linalg.sqrtm(space1.K @ space2.K)) ** 0.5
/ space1.N ** 0.5
)
def eig(kernel, num, Nx, eigenfunction=True):
"""Compute the eigenvalues and eigenfunctions of a kernel function in [0, 1]."""
h = 1 / (Nx - 1)
c = kernel(np.linspace(0, 1, num=Nx)[:, None])[0] * h
A = np.empty((Nx, Nx))
for i in range(Nx):
A[i, i:] = c[: Nx - i]
A[i, i::-1] = c[: i + 1]
A[:, 0] *= 0.5
A[:, -1] *= 0.5
if not eigenfunction:
return np.flipud(np.sort(np.real(np.linalg.eigvals(A))))[:num]
eigval, eigvec = np.linalg.eig(A)
eigval, eigvec = np.real(eigval), np.real(eigvec)
idx = np.flipud(np.argsort(eigval))[:num]
eigval, eigvec = eigval[idx], eigvec[:, idx]
for i in range(num):
eigvec[:, i] /= np.trapz(eigvec[:, i] ** 2, dx=h) ** 0.5
return eigval, eigvec