# Source code for limix.qc._covariance

[docs]def normalise_covariance(K, out=None): """ Variance rescaling of covariance matrix πΊ. Let n be the number of rows (or columns) of πΊ and let mα΅’ be the average of the values in the i-th column. Gower rescaling is defined as .. math:: πΊ(n - 1)/(πππππ(πΊ) - βmα΅’). Notes ----- The reasoning of the scaling is as follows. Let π  be a vector of n independent samples and let π² be the Gower's centering matrix. The unbiased variance estimator is .. math:: v = β (gα΅’-αΈ‘)Β²/(n-1) = πππππ((π -αΈ‘π)α΅(π -αΈ‘π))/(n-1) = πππππ(π²π π α΅π²)/(n-1) Let πΊ be the covariance matrix of π . The expectation of the unbiased variance estimator is .. math:: π[v] = πππππ(π²π[π π α΅]π²)/(n-1) = πππππ(π²πΊπ²)/(n-1), assuming that π[gα΅’]=0. We thus divide πΊ by π[v] to achieve an unbiased normalisation on the random variable gα΅’. Parameters ---------- K : array_like Covariance matrix to be normalised. out : array_like, optional Result destination. Defaults to None. Examples -------- .. doctest:: >>> from numpy import dot, mean, zeros >>> from numpy.random import RandomState >>> from limix.qc import normalise_covariance >>> >>> random = RandomState(0) >>> X = random.randn(10, 10) >>> K = dot(X, X.T) >>> Z = random.multivariate_normal(zeros(10), K, 500) >>> print("%.3f" % mean(Z.var(1, ddof=1))) 9.824 >>> Kn = normalise_covariance(K) >>> Zn = random.multivariate_normal(zeros(10), Kn, 500) >>> print("%.3f" % mean(Zn.var(1, ddof=1))) 1.008 .. _Dask: https://dask.pydata.org/ """ from numpy import asarray import dask.array as da from pandas import DataFrame import xarray as xr if isinstance(K, DataFrame): K = K.astype(float) trace = K.values.trace() elif isinstance(K, da.Array): trace = da.diag(K).sum() elif isinstance(K, xr.DataArray): trace = da.diag(K.data).sum() else: K = asarray(K, float) trace = K.trace() c = asarray((K.shape[0] - 1) / (trace - K.mean(axis=0).sum()), float) if out is None: return K * c _copyto(out, K) _inplace_mult(out, c) return out
def _copyto(dst, src): from numpy import copyto, ndarray import dask.array as da from pandas import DataFrame if isinstance(dst, DataFrame): copyto(dst.values, src) elif isinstance(dst, ndarray) and isinstance(src, da.Array): copyto(dst, src.compute()) else: copyto(dst, src) def _inplace_mult(out, c): import dask.array as da if isinstance(c, da.Array): c = c.compute() out *= c