from statsmodels.compat.platform import PLATFORM_OSX from statsmodels.regression.process_regression import ( ProcessMLE, GaussianCovariance) import numpy as np import pandas as pd import pytest import collections import statsmodels.tools.numdiff as nd from numpy.testing import assert_allclose, assert_equal # Parameters for a test model, with or without additive # noise. def model1(noise): mn_par = np.r_[1, 0, -1, 0] sc_par = np.r_[1, 1] sm_par = np.r_[0.5, 0.1] if noise: no_par = np.r_[0.25, 0.25] else: no_par = np.array([]) return mn_par, sc_par, sm_par, no_par def setup1(n, get_model, noise): mn_par, sc_par, sm_par, no_par = get_model(noise) groups = np.kron(np.arange(n // 5), np.ones(5)) time = np.kron(np.ones(n // 5), np.arange(5)) time_z = (time - time.mean()) / time.std() x_mean = np.random.normal(size=(n, len(mn_par))) x_sc = np.random.normal(size=(n, len(sc_par))) x_sc[:, 0] = 1 x_sc[:, 1] = time_z x_sm = np.random.normal(size=(n, len(sm_par))) x_sm[:, 0] = 1 x_sm[:, 1] = time_z mn = np.dot(x_mean, mn_par) sc = np.exp(np.dot(x_sc, sc_par)) sm = np.exp(np.dot(x_sm, sm_par)) if noise: x_no = np.random.normal(size=(n, len(no_par))) x_no[:, 0] = 1 x_no[:, 1] = time_z no = np.exp(np.dot(x_no, no_par)) else: x_no = None y = mn.copy() gc = GaussianCovariance() ix = collections.defaultdict(list) for i, g in enumerate(groups): ix[g].append(i) for g, ii in ix.items(): c = gc.get_cov(time[ii], sc[ii], sm[ii]) r = np.linalg.cholesky(c) y[ii] += np.dot(r, np.random.normal(size=len(ii))) # Additive white noise if noise: y += no * np.random.normal(size=y.shape) return y, x_mean, x_sc, x_sm, x_no, time, groups def run_arrays(n, get_model, noise): y, x_mean, x_sc, x_sm, x_no, time, groups = setup1(n, get_model, noise) preg = ProcessMLE(y, x_mean, x_sc, x_sm, x_no, time, groups) return preg.fit() @pytest.mark.slow @pytest.mark.parametrize("noise", [False, True]) def test_arrays(noise): np.random.seed(8234) f = run_arrays(1000, model1, noise) mod = f.model f.summary() # Smoke test # Compare the parameter estimates to population values. epar = np.concatenate(model1(noise)) assert_allclose(f.params, epar, atol=0.3, rtol=0.3) # Test the fitted covariance matrix cv = f.covariance(mod.time[0:5], mod.exog_scale[0:5, :], mod.exog_smooth[0:5, :]) assert_allclose(cv, cv.T) # Check symmetry a, _ = np.linalg.eig(cv) assert_equal(a > 0, True) # Check PSD # Test predict yhat = f.predict() assert_equal(np.corrcoef(yhat, mod.endog)[0, 1] > 0.2, True) yhatm = f.predict(exog=mod.exog) assert_equal(yhat, yhatm) yhat0 = mod.predict(params=f.params, exog=mod.exog) assert_equal(yhat, yhat0) # Smoke test t-test f.t_test(np.eye(len(f.params))) def run_formula(n, get_model, noise): y, x_mean, x_sc, x_sm, x_no, time, groups = setup1(n, get_model, noise) df = pd.DataFrame({ "y": y, "x1": x_mean[:, 0], "x2": x_mean[:, 1], "x3": x_mean[:, 2], "x4": x_mean[:, 3], "xsc1": x_sc[:, 0], "xsc2": x_sc[:, 1], "xsm1": x_sm[:, 0], "xsm2": x_sm[:, 1], "time": time, "groups": groups }) if noise: df["xno1"] = x_no[:, 0] df["xno2"] = x_no[:, 1] mean_formula = "y ~ 0 + x1 + x2 + x3 + x4" scale_formula = "0 + xsc1 + xsc2" smooth_formula = "0 + xsm1 + xsm2" if noise: noise_formula = "0 + xno1 + xno2" else: noise_formula = None preg = ProcessMLE.from_formula( mean_formula, data=df, scale_formula=scale_formula, smooth_formula=smooth_formula, noise_formula=noise_formula, time="time", groups="groups") f = preg.fit() return f, df @pytest.mark.slow @pytest.mark.parametrize("noise", [False, True]) def test_formulas(noise): np.random.seed(8789) f, df = run_formula(1000, model1, noise) mod = f.model f.summary() # Smoke test # Compare the parameter estimates to population values. epar = np.concatenate(model1(noise)) assert_allclose(f.params, epar, atol=0.1, rtol=1) # Test the fitted covariance matrix exog_scale = pd.DataFrame(mod.exog_scale[0:5, :], columns=["xsc1", "xsc2"]) exog_smooth = pd.DataFrame(mod.exog_smooth[0:5, :], columns=["xsm1", "xsm2"]) cv = f.covariance(mod.time[0:5], exog_scale, exog_smooth) assert_allclose(cv, cv.T) a, _ = np.linalg.eig(cv) assert_equal(a > 0, True) # Test predict yhat = f.predict() assert_equal(np.corrcoef(yhat, mod.endog)[0, 1] > 0.2, True) yhatm = f.predict(exog=df) assert_allclose(yhat, yhatm, rtol=1e-11) yhat0 = mod.predict(params=f.params, exog=df) assert_allclose(yhat, yhat0, rtol=1e-11) # Smoke test t-test f.t_test(np.eye(len(f.params))) # Test the score functions using numerical derivatives. @pytest.mark.parametrize("noise", [False, True]) def test_score_numdiff(noise): y, x_mean, x_sc, x_sm, x_no, time, groups = setup1(1000, model1, noise) preg = ProcessMLE(y, x_mean, x_sc, x_sm, x_no, time, groups) def loglike(x): return preg.loglike(x) q = x_mean.shape[1] + x_sc.shape[1] + x_sm.shape[1] if noise: q += x_no.shape[1] np.random.seed(342) atol = 2e-3 if PLATFORM_OSX else 1e-2 for _ in range(5): par0 = preg._get_start() par = par0 + 0.1 * np.random.normal(size=q) score = preg.score(par) score_nd = nd.approx_fprime(par, loglike, epsilon=1e-7) assert_allclose(score, score_nd, atol=atol, rtol=1e-4)