Source code for glimix_core.lmm._lmm

from math import exp

from numpy import (
    asarray,
    atleast_2d,
    concatenate,
    dot,
    full,
    log,
    maximum,
    sum as npsum,
    zeros,
)
from numpy.linalg import inv, lstsq, slogdet
from numpy_sugar import epsilon

from glimix_core._util import cache, log2pi
from optimix import Function, Scalar

from .._util import economic_qs_zeros, numbers
from ._lmm_scan import FastScanner


[docs]class LMM(Function): r""" Fast Linear Mixed Models inference via maximum likelihood. Examples -------- .. doctest:: >>> from numpy import array >>> from numpy_sugar.linalg import economic_qs_linear >>> from glimix_core.lmm import LMM >>> >>> X = array([[1, 2], [3, -1]], float) >>> QS = economic_qs_linear(X) >>> covariates = array([[1], [1]]) >>> y = array([-1, 2], float) >>> lmm = LMM(y, covariates, QS) >>> lmm.fit(verbose=False) >>> print('%.3f' % lmm.lml()) -3.649 One can also specify which parameters should be fitted: .. doctest:: >>> from numpy import array >>> from numpy_sugar.linalg import economic_qs_linear >>> from glimix_core.lmm import LMM >>> >>> X = array([[1, 2], [3, -1]], float) >>> QS = economic_qs_linear(X) >>> covariates = array([[1], [1]]) >>> y = array([-1, 2], float) >>> lmm = LMM(y, covariates, QS) >>> lmm.fix('delta') >>> lmm.fix('scale') >>> lmm.delta = 0.5 >>> lmm.scale = 1 >>> lmm.fit(verbose=False) >>> print('%.3f' % lmm.lml()) -3.832 >>> lmm.unfix('delta') >>> lmm.fit(verbose=False) >>> print('%.3f' % lmm.lml()) -3.713 Notes ----- The LMM model can be equivalently written as :: 𝐲 ∼ 𝓝(X𝜷, s((1-𝛿)K + 𝛿I)), and we thus have v₀ = s (1 - 𝛿) and v₁ = s 𝛿. Consider the economic eigendecomposition of K: .. math:: \overbrace{[\mathrm Q₀ \quad \mathrm Q₁]}^{\mathrm Q} \overbrace{\left[\begin{array}{cc} \mathrm S₀ & 𝟎\\ 𝟎 & 𝟎 \end{array}\right]}^{\mathrm S} \left[\begin{array}{c} \mathrm Q₀ᵀ \\ \mathrm Q₁ᵀ \end{array}\right] = \mathrm K and let .. math:: \mathrm D = \left[ \begin{array}{cc} (1-𝛿)\mathrm S₀ + 𝛿\mathrm I & 𝟎\\ 𝟎 & 𝛿\mathrm I \end{array} \right]. We thus have :: ((1-𝛿)K + 𝛿I)⁻¹ = QD⁻¹Qᵀ. A diagonal covariance-matrix can then be used to define an equivalent marginal likelihood:: 𝓝(Qᵀ𝐲|QᵀX𝜷, sD). """ def __init__(self, y, X, QS=None, restricted=False): """ Constructor. Parameters ---------- y : array_like Outcome. X : array_like Covariates as a two-dimensional array. QS : tuple Economic eigendecompositon in form of ``((Q0, Q1), S0)`` of a covariance matrix ``K``. restricted : bool ``True`` for restricted maximum likelihood optimization; ``False`` otherwise. Defaults to ``False``. """ from numpy_sugar import is_all_finite from numpy_sugar.linalg import ddot, economic_svd logistic = Scalar(0.0) logistic.listen(self._delta_update) logistic.bounds = (-numbers.logmax, +numbers.logmax) Function.__init__(self, "LMM", logistic=logistic) self._logistic = logistic y = asarray(y, float).ravel() if not is_all_finite(y): raise ValueError("There are non-finite values in the outcome.") if len(y) == 0: raise ValueError("The outcome array is empty.") X = atleast_2d(asarray(X, float).T).T if not is_all_finite(X): raise ValueError("There are non-finite values in the covariates matrix.") self._optimal = {"beta": False, "scale": False} if QS is None: QS = economic_qs_zeros(len(y)) self.delta = 1.0 logistic.fix() else: self.delta = 0.5 if QS[0][0].shape[0] != len(y): msg = "Sample size differs between outcome and covariance decomposition." raise ValueError(msg) if y.shape[0] != X.shape[0]: msg = "Sample size differs between outcome and covariates." raise ValueError(msg) self._Darr = [] n = y.shape[0] d = self.delta if QS[1].size > 0: self._Darr += [QS[1] * (1 - d) + d] if QS[1].size < n: self._Darr += [full(n - QS[1].size, d)] self._y = y self._QS = QS SVD = economic_svd(X) self._X = {"X": X, "tX": ddot(SVD[0], SVD[1]), "VT": SVD[2]} self._tbeta = zeros(len(SVD[1])) self._scale = 1.0 self._fix = {"beta": False, "scale": False} self._restricted = restricted @property def beta(self): """ Fixed-effect sizes. Returns ------- effect-sizes : numpy.ndarray Optimal fixed-effect sizes. Notes ----- Setting the derivative of log(p(𝐲)) over effect sizes equal to zero leads to solutions 𝜷 from equation :: (QᵀX)ᵀD⁻¹(QᵀX)𝜷 = (QᵀX)ᵀD⁻¹(Qᵀ𝐲). """ from numpy_sugar.linalg import rsolve return rsolve(self._X["VT"], rsolve(self._X["tX"], self.mean())) @beta.setter def beta(self, beta): beta = asarray(beta, float).ravel() self._tbeta[:] = self._X["VT"] @ beta self._optimal["beta"] = False self._optimal["scale"] = False @property def beta_covariance(self): """ Estimates the covariance-matrix of the optimal beta. Returns ------- beta-covariance : ndarray (Xᵀ(s((1-𝛿)K + 𝛿I))⁻¹X)⁻¹. References ---------- .. Rencher, A. C., & Schaalje, G. B. (2008). Linear models in statistics. John Wiley & Sons. """ from numpy_sugar.linalg import ddot tX = self._X["tX"] Q = concatenate(self._QS[0], axis=1) S0 = self._QS[1] D = self.v0 * S0 + self.v1 D = D.tolist() + [self.v1] * (len(self._y) - len(D)) D = asarray(D) A = inv(tX.T @ (Q @ ddot(1 / D, Q.T @ tX))) VT = self._X["VT"] H = lstsq(VT, A, rcond=None)[0] return lstsq(VT, H.T, rcond=None)[0]
[docs] def fix(self, param): """ Disable parameter optimization. Parameters ---------- param : str Possible values are ``"delta"``, ``"beta"``, and ``"scale"``. """ if param == "delta": super()._fix("logistic") else: self._fix[param] = True
[docs] def unfix(self, param): """ Enable parameter optimization. Parameters ---------- param : str Possible values are ``"delta"``, ``"beta"``, and ``"scale"``. """ if param == "delta": self._unfix("logistic") else: self._fix[param] = False
@property def v0(self): """ First variance. Returns ------- v0 : float s(1 - 𝛿). """ return self.scale * (1 - self.delta) @property def v1(self): """ Second variance. Returns ------- v1 : float s𝛿. """ return self.scale * self.delta
[docs] def fit(self, verbose=True): """ Maximise the marginal likelihood. Parameters ---------- verbose : bool, optional ``True`` for progress output; ``False`` otherwise. Defaults to ``True``. """ if not self._isfixed("logistic"): self._maximize_scalar(desc="LMM", rtol=1e-6, atol=1e-6, verbose=verbose) if not self._fix["beta"]: self._update_beta() if not self._fix["scale"]: self._update_scale()
[docs] def get_fast_scanner(self): """ Return :class:`.FastScanner` for association scan. Returns ------- fast-scanner : :class:`.FastScanner` Instance of a class designed to perform very fast association scan. """ v0 = self.v0 v1 = self.v1 QS = (self._QS[0], v0 * self._QS[1]) return FastScanner(self._y, self.X, QS, v1)
[docs] def value(self): """ Internal use only. """ if not self._fix["beta"]: self._update_beta() if not self._fix["scale"]: self._update_scale() return self.lml()
[docs] def gradient(self): """ Not implemented. """ raise NotImplementedError
@property def nsamples(self): """ Number of samples, n. """ return len(self._y) @property def ncovariates(self): """ Number of covariates, c. """ return self._X["X"].shape[1]
[docs] def lml(self): """ Log of the marginal likelihood. Returns ------- lml : float Log of the marginal likelihood. Notes ----- The log of the marginal likelihood is given by :: 2⋅log(p(𝐲)) = -n⋅log(2π) - n⋅log(s) - log|D| - (Qᵀ𝐲)ᵀs⁻¹D⁻¹(Qᵀ𝐲) + (Qᵀ𝐲)ᵀs⁻¹D⁻¹(QᵀX𝜷)/2 - (QᵀX𝜷)ᵀs⁻¹D⁻¹(QᵀX𝜷). By using the optimal 𝜷, the log of the marginal likelihood can be rewritten as:: 2⋅log(p(𝐲)) = -n⋅log(2π) - n⋅log(s) - log|D| + (Qᵀ𝐲)ᵀs⁻¹D⁻¹Qᵀ(X𝜷-𝐲). In the extreme case where 𝜷 is such that 𝐲 = X𝜷, the maximum is attained as s→0. For optimals 𝜷 and s, the log of the marginal likelihood can be further simplified to :: 2⋅log(p(𝐲; 𝜷, s)) = -n⋅log(2π) - n⋅log s - log|D| - n. """ reml = (self._logdetXX() - self._logdetH()) / 2 if self._optimal["scale"]: lml = self._lml_optimal_scale() else: lml = self._lml_arbitrary_scale() return lml + reml
@property def X(self): """ Covariates matrix. Returns ------- X : ndarray Covariates. """ return self._X["X"] @property def delta(self): """ Variance ratio between ``K`` and ``I``. """ v = float(self._logistic.value) if v > 0.0: v = 1 / (1 + exp(-v)) else: v = exp(v) v = v / (v + 1.0) return min(max(v, epsilon.tiny), 1 - epsilon.tiny) @delta.setter def delta(self, delta): delta = min(max(delta, epsilon.tiny), 1 - epsilon.tiny) self._logistic.value = log(delta / (1 - delta)) self._optimal["beta"] = False self._optimal["scale"] = False @property def scale(self): """ Scaling factor. Returns ------- scale : float Scaling factor. Notes ----- Setting the derivative of log(p(𝐲; 𝜷)), for which 𝜷 is optimal, over scale equal to zero leads to the maximum :: s = n⁻¹(Qᵀ𝐲)ᵀD⁻¹ Qᵀ(𝐲-X𝜷). In the case of restricted marginal likelihood :: s = (n-c)⁻¹(Qᵀ𝐲)ᵀD⁻¹ Qᵀ(𝐲-X𝜷), where s is the number of covariates. """ return self._scale @scale.setter def scale(self, scale): self._scale = scale self._optimal["scale"] = False
[docs] def mean(self): """ Mean of the prior. Formally, 𝐦 = X𝜷. Returns ------- mean : ndarray Mean of the prior. """ return self._X["tX"] @ self._tbeta
[docs] def covariance(self): """ Covariance of the prior. Returns ------- covariance : ndarray v₀K + v₁I. """ from numpy_sugar.linalg import ddot, sum2diag Q0 = self._QS[0][0] S0 = self._QS[1] return sum2diag(dot(ddot(Q0, self.v0 * S0), Q0.T), self.v1)
def _delta_update(self): self._optimal["beta"] = False self._optimal["scale"] = False self._Dcache = None @cache def _logdetXX(self): """ log(|XᵀX|). """ if not self._restricted: return 0.0 ldet = slogdet(self._X["tX"].T @ self._X["tX"]) if ldet[0] != 1.0: raise ValueError("The determinant of XᵀX should be positive.") return ldet[1] def _logdetH(self): """ log(|H|) for H = s⁻¹XᵀQD⁻¹QᵀX. """ if not self._restricted: return 0.0 ldet = slogdet(sum(self._XTQDiQTX) / self.scale) if ldet[0] != 1.0: raise ValueError("The determinant of H should be positive.") return ldet[1] def _lml_optimal_scale(self): """ Log of the marginal likelihood for optimal scale. Implementation for unrestricted LML:: Returns ------- lml : float Log of the marginal likelihood. """ assert self._optimal["scale"] n = len(self._y) lml = -self._df * log2pi - self._df - n * log(self.scale) lml -= sum(npsum(log(D)) for D in self._D) return lml / 2 def _lml_arbitrary_scale(self): """ Log of the marginal likelihood for arbitrary scale. Returns ------- lml : float Log of the marginal likelihood. """ s = self.scale D = self._D n = len(self._y) lml = -self._df * log2pi - n * log(s) lml -= sum(npsum(log(d)) for d in D) d = (mTQ - yTQ for (mTQ, yTQ) in zip(self._mTQ, self._yTQ)) lml -= sum((i / j) @ i for (i, j) in zip(d, D)) / s return lml / 2 @property def _df(self): """ Degrees of freedom. """ if not self._restricted: return self.nsamples return self.nsamples - self._X["tX"].shape[1] def _optimal_scale_using_optimal_beta(self): from numpy_sugar import epsilon assert self._optimal["beta"] yTQDiQTy = self._yTQDiQTy yTQDiQTm = self._yTQDiQTX s = sum(i - j @ self._tbeta for (i, j) in zip(yTQDiQTy, yTQDiQTm)) return maximum(s / self._df, epsilon.small) def _update_beta(self): from numpy_sugar.linalg import rsolve assert not self._fix["beta"] if self._optimal["beta"]: return yTQDiQTm = list(self._yTQDiQTX) mTQDiQTm = list(self._XTQDiQTX) nominator = yTQDiQTm[0] denominator = mTQDiQTm[0] if len(yTQDiQTm) > 1: nominator += yTQDiQTm[1] denominator += mTQDiQTm[1] self._tbeta[:] = rsolve(denominator, nominator) self._optimal["beta"] = True self._optimal["scale"] = False def _update_scale(self): from numpy_sugar import epsilon if self._optimal["beta"]: self._scale = self._optimal_scale_using_optimal_beta() else: yTQDiQTy = self._yTQDiQTy yTQDiQTm = self._yTQDiQTX b = self._tbeta p0 = sum(i - 2 * j @ b for (i, j) in zip(yTQDiQTy, yTQDiQTm)) p1 = sum((b @ i) @ b for i in self._XTQDiQTX) self._scale = maximum((p0 + p1) / self._df, epsilon.small) self._optimal["scale"] = True @property def _D(self): if self._Dcache is None: i = 0 d = self.delta if self._QS[1].size > 0: self._Darr[i][:] = self._QS[1] self._Darr[i] *= 1 - d self._Darr[i] += d i += 1 if self._QS[1].size < self._y.shape[0]: self._Darr[i][:] = d self._Dcache = self._Darr return self._Dcache @property def _XTQDiQTX(self): return (i / j @ i.T for (i, j) in zip(self._tXTQ, self._D)) @property def _mTQ(self): return (self.mean().T @ Q for Q in self._QS[0] if Q.size > 0) @property def _tXTQ(self): return (self._X["tX"].T @ Q for Q in self._QS[0] if Q.size > 0) @property def _XTQ(self): return (self._X["tX"].T @ Q for Q in self._QS[0] if Q.size > 0) @property def _yTQ(self): return (self._y.T @ Q for Q in self._QS[0] if Q.size > 0) @property def _yTQQTy(self): return (yTQ ** 2 for yTQ in self._yTQ) @property def _yTQDiQTy(self): return (npsum(i / j) for (i, j) in zip(self._yTQQTy, self._D)) @property def _yTQDiQTX(self): yTQ = self._yTQ D = self._D tXTQ = self._tXTQ return (i / j @ l.T for (i, j, l) in zip(yTQ, D, tXTQ))