"""Running stats module.
This module defines the RunningStats and BlockAverager classes, as well as the
gen_running_average function.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from builtins import object
from six.moves import range
import numpy as np
from scipy import stats
# Running Statistics
[docs]class RunningStats(object):
"""A RunningStats object.
The RunningStats object keeps running statistics for a single
value/quantity.
Attributes:
n (int): The number of points that have pushed to the running
average.
"""
def __init__(self):
"""Initialize the RunningStats object.
"""
self.n=0
self._Mnold = self._Mnnew = self._Snold = self._Snnew = np.zeros(1)[0]
[docs] def push(self, val):
"""Push a new value to the running average.
Args:
val (float): The value to be added to the running average.
Returns:
"""
self.n += 1
if self.n == 1:
self._Mnold = np.array([val])[0]
self._Snold = np.zeros(1)[0]
else:
n = np.array([float(self.n)])[0]
self._Mnnew = self._Mnold + (val - self._Mnold)/(n);
self._Snnew = self._Snold + (val - self._Mnold)*(val-self._Mnnew);
self._Mnold = self._Mnnew;
self._Snold = self._Snnew;
[docs] def mean(self):
"""Return the current mean."""
if self.n == 1:
return self._Mnold
elif self.n > 1:
return self._Mnnew
else:
return 0.0
[docs] def variance(self):
"""Returun the current variance."""
if self.n > 1:
one = np.array([1.0])[0]
n = np.array([float(self.n)])[0]
vary = self._Snnew/(n-one)
return vary
else:
return 0.0
[docs] def deviation(self):
"""Return the current standard deviation."""
# dev = math.sqrt(self.Variance())
dev = np.sqrt(self.variance())
return dev
[docs] def reset(self):
"""Reset the running average."""
self.n = 0
# assumes that a 1d numpy array of floats is pass as input, but
# does not check this
[docs]def gen_running_average(onednparray):
""" Generates a running average
Args:
onednparray (numpy.array): A 1d numpy array of measurements (e.g. over time)
Returns:
numpy.array: 2d array of dim len(onednparray)x2
2dnparray[i][0] = running average at i
2dnparray[i][1] = running standard deviation at i
for i in range(0,len(onednparray))
"""
averager = RunningStats()
nele = len(onednparray)
output = np.zeros((nele,2))
for i in range(nele):
averager.push(onednparray[i])
run_avg = averager.mean()
run_dev = averager.deviation()
# print run_avg, run_dev, averager.mean(), onednparray[i]
output[i,0] = run_avg
output[i,1] = run_dev
return output
[docs]class BlockAverager(object):
"""An object that keeps track of points for block averaging.
Attributes:
n_blocks (int): The current number of active blocks.
"""
def __init__(self, points_per_block=1000, min_points_in_block=500, store_data=False):
"""Init a the BlockAverager
Args:
points_per_block (int, Optional): The number of points to assign to a block before initiating a new block.
Default: 1000
min_points_in_block (int, Optional): The minimum number of points that a block (typically the last block)
can have and still be included in computing the final block average and standard error estimates. This
value should be <= points_per_block. Default: 500
"""
self._store_data = store_data
self._blocks = [RunningStats()]
if store_data:
self._blocks = [[]]
self.n_blocks = 1
self._points_per_block = points_per_block
if min_points_in_block > points_per_block:
self._min_points_in_block = points_per_block-1
else:
self._min_points_in_block = min_points_in_block
#print "points_per_block ",self._points_per_block, " min_p ",self._min_points_in_block
return
def _add_block(self):
"""Append a new block."""
if self._store_data:
self._blocks.append([])
else:
self._blocks.append(RunningStats())
self.n_blocks+=1
return
def _check_add_block(self):
"""Check whether to add a new block and do so if the condition is met."""
block_i = self.n_blocks - 1
if self._store_data:
if len(self._blocks[block_i]) >= self._points_per_block:
self._add_block()
else:
if self._blocks[block_i].n >= self._points_per_block:
self._add_block()
return
[docs] def push_single(self, datum):
"""Push a single data point (datum) into the block averager.
Args:
datum (float): The value to add to the block averaging.
"""
block_i = self.n_blocks-1
#print "pushing datum ",datum
if self._store_data:
self._blocks[block_i].append(datum)
else:
self._blocks[block_i].push(datum)
self._check_add_block()
return
[docs] def push_container(self, data):
"""Push a container (array or array like) of data points to the block averaging.
Args:
data (array like): The container (list, tuple, np.array, etc.) of data points to add to the block averaging.
"""
for datum in data:
#print(datum)
self.push_single(datum)
return
def _get_running(self):
"""Get the block average quantities from interanl RunningStats
objects.
"""
means = []
for block in self._blocks:
#print "block.n ",block.n, " min_p ",self._min_points_in_block
if block.n >= self._min_points_in_block:
means.append(block.mean())
means = np.array(means)
if len(means) > 1:
block_average = means.mean()
std_err = means.std()/np.sqrt(len(means))
elif len(means) == 1:
block_average = means[0]
std_err = 0.0
else:
block_average = 0.0
std_err = 0.0
return block_average, std_err
def _get_np(self):
"""Get the block average quantities from internally stored numpy
arrays.
"""
means = []
for block in self._blocks:
if len(block) >= self._min_points_in_block:
means.append(np.array(block).mean())
means = np.array(means)
if len(means) > 1:
block_average = means.mean()
std_err = means.std()/np.sqrt(len(means))
elif len(means) == 1:
block_average = means[0]
std_err = 0.0
else:
block_average = 0.0
std_err = 0.0
return block_average, std_err
[docs] def get(self):
"""Return the block average and standard error.
Returns:
tuple: Returns a length two tuple with the block average and standard error estimates.
"""
#print(self._blocks)
if self._store_data:
return self._get_np()
else:
return self._get_running()
def _aob_running(self):
"""Get the block average quantities from interanl RunningStats
objects.
"""
means = []
for block in self._blocks:
#print "block.n ",block.n, " min_p ",self._min_points_in_block
if block.n >= self._min_points_in_block:
means.append(block.mean())
means = np.array(means)
return means
def _aob_np(self):
"""Get the block average quantities from internally stored numpy
arrays.
"""
means = []
for block in self._blocks:
if len(block) >= self._min_points_in_block:
means.append(np.array(block).mean())
means = np.array(means)
return means
[docs] def averages_of_blocks(self):
"""Return the block average and standard error.
Returns:
tuple: Returns a length two tuple with the block average and standard error estimates.
"""
#print(self._blocks)
if self._store_data:
return self._aob_np()
else:
return self._aob_running()
def _sob_running(self):
"""Get the block average quantities from interanl RunningStats
objects.
"""
means = []
for block in self._blocks:
#print "block.n ",block.n, " min_p ",self._min_points_in_block
if block.n >= self._min_points_in_block:
means.append(block.deviation())
means = np.array(means)
return means
def _sob_np(self):
"""Get the block average quantities from internally stored numpy
arrays.
"""
means = []
for block in self._blocks:
if len(block) >= self._min_points_in_block:
means.append(np.array(block).std())
means = np.array(means)
return means
[docs] def standards_of_blocks(self):
"""Return the block average and standard error.
Returns:
tuple: Returns a length two tuple with the block average and standard error estimates.
"""
#print(self._blocks)
if self._store_data:
return self._sob_np()
else:
return self._sob_running()
[docs] def number_of_blocks(self):
"""Return the current number of blocks.
Returns:
int : The number of blocks.
"""
return self.n_blocks
[docs] def points_per_block(self):
"""Return information about the points per block.
Returns:
tuple: A three element tuple containing the setting for points per block, the setting for minimum points
per block, and the number of points in the last block.
"""
if self._store_data:
return self._points_per_block, self._min_points_in_block, len(self._blocks[self.n_blocks-1])
else:
return self._points_per_block, self._min_points_in_block, self._blocks[self.n_blocks - 1].n
[docs] def n_block(self):
if self._store_data:
n_block = 0
for block in self._blocks:
if len(block) >= self._min_points_in_block:
n_block += 1
return n_block
else:
n_block = 0
for block in self._blocks:
#print "block.n ",block.n, " min_p ",self._min_points_in_block
if block.n >= self._min_points_in_block:
n_block += 1
return n_block
[docs]def block_average_bse_v_size(data):
n_dat = len(data)
max_size = int(n_dat/3)
output = list()
for i in range(1, max_size+1, 1):
block_averager = BlockAverager(points_per_block=i, min_points_in_block=i)
block_averager.push_container(data)
avg, std_error = block_averager.get()
print(i/10, avg, std_error)
output.append([i, avg, std_error])
return np.array(output)
[docs]def binned_average(data, positions, n_bins=25, position_range=None, min_count=0):
"""Compute averages over a quantized range of histogram like bins.
Args:
data (np.array): A 1d numpy array of values.
positions (np.array): A 1d numpy array of positions corresponding to
the values in data. These are used to assign the values to the
histogram like bins for averaging.
n_bins (Optional[int]): Set the target number of bins to quantize the
position_range up into. Defaults to 25
position_range (Optional[tuple]): A two element tuple containing the
lower and upper range to bin the postions over; i.e.
(position_lower, postion_upper). Defaults to None, which uses
positions.min() and positions.max().
Returns:
tuple: returns a tuple with two numpy arrays of form (bins, averages)
Notes:
The function automatically filters out bins that have a zero count,
so the final value of the number of bins and values will be
len(bins) <= n_bins.
"""
lower = None
upper = None
if position_range is not None:
lower = position_range[0]
upper = position_range[1]
else:
lower = positions.min()
upper = positions.max()
edges = np.linspace(lower, upper, num=n_bins+1, endpoint=True)
bins = np.linspace(lower, upper, num=n_bins, endpoint=False)
counts = (np.zeros(len(bins))).astype(np.int64)
sums = np.zeros(len(bins))
n_data = len(data)
# Loop over the data points.
for i in range(n_data):
c_val = data[i]
pos = positions[i]
bin_index = None
# Select which bin (if any) the value corresponds to.
for j in range(1, len(bins)+1):
if (pos >= edges[j-1]) and (pos <= edges[j]):
bin_index = j - 1
break
if bin_index is not None:
counts[bin_index] += 1
sums[bin_index] += c_val
# Filter out the bins that had zero entries.
keep_bins = []
for i in range(len(counts)):
if counts[i] > 0:
keep_bins.append(i)
# Return the filtered bins and averages (i.e. without NaNs).
bins = bins[keep_bins]
counts = counts[keep_bins]
sums = sums[keep_bins]
averages = sums / counts
return bins, averages
#pair t-test for comparison of the difference of two means - null is zero difference
[docs]def pair_ttest(mean_1, std_err_1, n_1, mean_2, std_err_2, n_2):
m_diff = np.abs(mean_1 - mean_2)
diff_err = np.sqrt(std_err_1**2 + std_err_2**2)
degrees = n_1 + n_2 - 2
tval = m_diff / diff_err
pval = 2.0*(1.0 - stats.t.cdf(tval, df=degrees))
return pval
[docs]def block_avg_hist(nparray_1d, block_size, in_range='auto', scale=False, *args, **kwargs):
'''Creates histograms for each block and averages them to generate block
a single block averaged historgram.
'''
if in_range == 'auto':
in_range = [min(nparray_1d), max(nparray_1d)]
# Number of blocks of block_size
nblocks = int(len(nparray_1d)/block_size)
# print(nblocks)
# Trim the array to just the points to use with the blocking
array_trim = nparray_1d[:block_size*nblocks]
blocks = [array_trim[i*block_size:(i+1)*block_size] for i in range(nblocks)]
# print(len(blocks))
# print(len(blocks[0]))
# print(len(blocks[1]))
counts, edges = np.histogram(blocks[0], *args, range=in_range, **kwargs)
# print(counts)
c_list = [counts]
for i in range(1,nblocks):
counts, edges = np.histogram(blocks[i], *args, range=in_range, **kwargs)
# print(counts)
c_list.append(counts)
stacked = np.stack(c_list, axis=1)
# print(stacked)
avg_count = stacked.mean(axis=1)
# print(avg_count)
se_count = stacked.std(axis=1)/np.sqrt(nblocks)
centers = 0.5*(edges[1:] + edges[:-1])
if scale:
avg_count /= block_size
se_count /= block_size
return avg_count, se_count, centers