Source code for stat_utils

import numpy as np
import scipy as sp
from scipy import stats

import chippr
from chippr import defaults as d
from chippr import utils as u

[docs]def mean(population): """ Calculates the mean of a population Parameters ---------- population: np.array, float population over which to calculate the mean Returns ------- mean: np.array, float mean value over population """ shape = np.shape(population) flat = population.reshape(np.prod(shape[:-1]), shape[-1]) mean = np.mean(flat, axis=0) return mean
[docs]def norm_fit(population): """ Calculates the mean and standard deviation of a population Parameters ---------- population: np.array, float population over which to calculate the mean Returns ------- norm_stats: tuple, list, float mean and standard deviation over population """ shape = np.shape(population) flat = population.reshape(np.prod(shape[:-1]), shape[-1]).T locs, scales = [], [] for k in range(shape[-1]): (loc, scale) = sp.stats.norm.fit_loc_scale(flat[k]) locs.append(loc) scales.append(scale) locs = np.array(locs) scales = np.array(scales) norm_stats = (locs, scales) return norm_stats
[docs]def calculate_kld(pe, qe, vb=True): """ Calculates the Kullback-Leibler Divergence between two PDFs. Parameters ---------- pe: numpy.ndarray, float probability distribution evaluated on a grid whose distance from `q` will be calculated. qe: numpy.ndarray, float probability distribution evaluated on a grid whose distance to `p` will be calculated. vb: boolean report on progress to stdout? Returns ------- Dpq: float the value of the Kullback-Leibler Divergence from `q` to `p` """ # Normalize the evaluations, so that the integrals can be done # (very approximately!) by simple summation: pn = pe / np.sum(pe) qn = qe / np.sum(qe) # Compute the log of the normalized PDFs logp = u.safe_log(pn) logq = u.safe_log(qn) # Calculate the KLD from q to p Dpq = np.sum(pn * (logp - logq)) return Dpq
[docs]def calculate_rms(pe, qe, vb=True): """ Calculates the Root Mean Square Error between two PDFs. Parameters ---------- pe: numpy.ndarray, float probability distribution evaluated on a grid whose distance _from_ `q` will be calculated. qe: numpy.ndarray, float probability distribution evaluated on a grid whose distance _to_ `p` will be calculated. vb: boolean report on progress to stdout? Returns ------- rms: float the value of the RMS error between `q` and `p` """ npoints = len(pe) assert len(pe) == len(qe) # Calculate the RMS between p and q rms = np.sqrt(np.sum((pe - qe) ** 2) / npoints) return rms
[docs]def single_parameter_gr_stat(chain): """ Calculates the Gelman-Rubin test statistic of convergence of an MCMC chain over one parameter Parameters ---------- chain: numpy.ndarray, float single-parameter chain Returns ------- R_hat: float potential scale reduction factor """ ssq = np.var(chain, axis=1, ddof=1) W = np.mean(ssq, axis=0) xb = np.mean(chain, axis=1) xbb = np.mean(xb, axis=0) m = chain.shape[0] n = chain.shape[1] B = n / (m - 1.) * np.sum((xbb - xb)**2., axis=0) var_x = (n - 1.) / n * W + 1. / n * B R_hat = np.sqrt(var_x / W) return R_hat
[docs]def multi_parameter_gr_stat(sample): """ Calculates the Gelman-Rubin test statistic of convergence of an MCMC chain over multiple parameters Parameters ---------- sample: numpy.ndarray, float multi-parameter chain output Returns ------- Rs: numpy.ndarray, float vector of the potential scale reduction factors """ dims = np.shape(sample) (n_walkers, n_iterations, n_params) = dims n_burn_ins = n_iterations / 2 chain_ensemble = np.swapaxes(sample, 0, 1) chain_ensemble = chain_ensemble[n_burn_ins:, :] Rs = np.zeros((n_params)) for i in range(n_params): chains = chain_ensemble[:, :, i].T Rs[i] = single_parameter_gr_stat(chains) return Rs
[docs]def gr_test(sample, threshold=d.gr_threshold): """ Performs the Gelman-Rubin test of convergence of an MCMC chain Parameters ---------- sample: numpy.ndarray, float chain output threshold: float, optional Gelman-Rubin test statistic criterion (usually around 1) Returns ------- test_result: boolean True if burning in, False if post-burn in """ gr = multi_parameter_gr_stat(sample) print('Gelman-Rubin test statistic = '+str(gr)) test_result = np.max(gr) > threshold return test_result
[docs]def cft(xtimes, lag):#xtimes has ntimes elements """ Helper function to calculate autocorrelation time for chain of MCMC samples Parameters ---------- xtimes: numpy.ndarray, float single parameter values for a single walker over all iterations lag: int maximum lag time in number of iterations Returns ------- ans: numpy.ndarray, float autocorrelation time for one time lag for one parameter of one walker """ lent = len(xtimes) - lag allt = xrange(lent) ans = np.array([xtimes[t+lag] * xtimes[t] for t in allt]) return ans
[docs]def cf(xtimes):#xtimes has ntimes elements """ Helper function to calculate autocorrelation time for chain of MCMC samples Parameters ---------- xtimes: numpy.ndarray, float single parameter values for a single walker over all iterations Returns ------- cf: numpy.ndarray, float autocorrelation time over all time lags for one parameter of one walker """ cf0 = np.dot(xtimes, xtimes) allt = xrange(len(xtimes) / 2) cf = np.array([sum(cft(xtimes,lag)[len(xtimes) / 2:]) for lag in allt]) / cf0 return cf
[docs]def cfs(x, mode):#xbinstimes has nbins by ntimes elements """ Helper function for calculating autocorrelation time for MCMC chains Parameters ---------- x: numpy.ndarray, float input parameter values of length number of iterations by number of walkers if mode='walkers' or dimension of parameters if mode='bins' mode: string 'bins' for one autocorrelation time per parameter, 'walkers' for one autocorrelation time per walker Returns ------- cfs: numpy.ndarray, float autocorrelation times for all walkers if mode='walkers' or all parameters if mode='bins' """ if mode == 'walkers': xbinstimes = x cfs = np.array([sum(cf(xtimes)) for xtimes in xbinstimes]) / len(xbinstimes) if mode == 'bins': xwalkerstimes = x cfs = np.array([sum(cf(xtimes)) for xtimes in xwalkerstimes]) / len(xwalkerstimes) return cfs
[docs]def acors(xtimeswalkersbins, mode='bins'): """ Calculates autocorrelation time for MCMC chains Parameters ---------- xtimeswalkersbins: numpy.ndarray, float emcee chain values of dimensions (n_iterations, n_walkers, n_parameters) mode: string, optional 'bins' for one autocorrelation time per parameter, 'walkers' for one autocorrelation time per walker Returns ------- taus: numpy.ndarray, float autocorrelation times by bin or by walker depending on mode """ if mode == 'walkers': xwalkersbinstimes = np.swapaxes(xtimeswalkersbins, 1, 2)#nwalkers by nbins by nsteps taus = np.array([1. + 2. * sum(cfs(xbinstimes, mode)) for xbinstimes in xwalkersbinstimes])#/len(xwalkersbinstimes)# 1+2*sum(...) if mode == 'bins': xbinswalkerstimes = xtimeswalkersbins.T#nbins by nwalkers by nsteps taus = np.array([1. + 2. * sum(cfs(xwalkerstimes, mode)) for xwalkerstimes in xbinswalkerstimes])#/len(xwalkersbinstimes)# 1+2*sum(...) return taus