#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Victor Calderon
# Created : 2018-04-28
# Last Modified: 2018-04-28
from __future__ import absolute_import, division, print_function
__author__ = ['Victor Calderon']
__copyright__ = ["Copyright 2018 Victor Calderon"]
__email__ = ['victor.calderon@vanderbilt.edu']
__maintainer__ = ['Victor Calderon']
__all__ = [ "myceil",
"myfloor",
"Bins_array_create",
"sigma_calcs",
"Stats_one_arr"]
"""
Set of statistical functions
"""
## Import modules
import math
import numpy as np
from cosmo_utils.utils import file_utils as fd
from cosmo_utils.custom_exceptions import LSSUtils_Error
## Functions
# Upper-bound values
[docs]def myceil(x, base=10):
"""
Determines the upper-bound interger for a given number with a given base.
Parameters
----------
x : float
Number to be approximated to closest number to `base`
base : float
Base used to calculate the closest largest number
Returns
----------
y : float
Closest float number to `x`, i.e. upper-bound float
Examples
----------
>>> myceil(12, 10)
20.0
>>> myceil(12.05, 1.)
13.0
>>> myceil(12.05, 0.5)
12.5
"""
y = float(base * math.ceil(float(x)/base))
return y
## Lower-bound values
[docs]def myfloor(x, base=10):
"""
Determines the lower-bound interger for a given number with a given base.
Parameters
----------
x : float
Number to be approximated to closest number to `base`
base : float
Base used to calculate the closest largest number
Returns
----------
y : float
Closest float number to `x`, i.e. upper-bound float
Examples
----------
>>> myfloor(12, 10)
10.0
>>> myfloor(12.05, 1.)
12.0
>>> myfloor(12.05, 0.2)
12.0
"""
y = float(base * math.floor(float(x)/base))
return y
## Generation of bins evenly spaced out
[docs]def Bins_array_create(arr, base=10, return_tuple=False):
"""
Generates an evenly-spaced array between the minimum and maximum value
of a given array,
Parameters
----------
arr : array_like
Array of of numbers or floats
base : `int` or `float`, optional
Interval used to create the evenly-spaced array of elements
return_tuple : `bool`, optional
If `True`, the function returns a set of tuples for each bin. This
variable is set to `False` by default.
Returns
----------
bins_arr : `numpy.ndarray`
Array of elements separated in intervals of `base`
"""
file_msg = fd.Program_Msg(__file__)
# Transforming input data
base = float(base)
arr = np.asarray(arr)
# Checking array dimensions
if arr.ndim != 1:
msg = '{0} The input array is not of dimension 1, but of `{1}`'.format(
file_msg, arr.ndim)
raise LSSUtils_Error(msg)
# Creating evenly-spaced array
arr_min = myfloor(arr.min(), base=base)
arr_max = myceil(arr.max(), base=base)
bins_arr = np.arange(arr_min, arr_max + 0.5*base, base)
# Creating tuple if necessary
if return_tuple:
bins_arr_mod = (np.array([[bins_arr[ii], bins_arr[ii+1]]
for ii in range(len(bins_arr) - 1)]))
return_obj = bins_arr_mod
else:
return_obj = bins_arr
return return_obj
## Calculations of percentiles and sigmas
[docs]def sigma_calcs(data_arr, type_sigma='std', perc_arr=[68., 95., 99.7],
return_mean_std=False):
"""
Calcualates the 1-, 2-, and 3-sigma ranges for `data_arr`
Parameters
-----------
data_arr : `numpy.ndarray`, shape( param_dict['nrpbins'], param_dict['itern_tot'])
array of values, from which to calculate percentiles or St. Dev.
type_sigma : {'perc', 'std'} string, optional (default = 'std')
Option for calculating either `percentiles` or `standard deviations`
- 'perc': calculates percentiles
- 'std' : uses standard deviations as 1-, 2-, and 3-sigmas
perc_arr : array_like, optional (default = [68., 95., 99.7])
Array of percentiles to calculate
return_mean_std : `bool`, optional (default = False)
Option for returning mean and St. Dev. along with `sigma_dict`
Return
----------
sigma_dict: python dicitionary
dictionary containg the 1-, 2-, and 3-sigma upper and lower
ranges for `data-arr`
mark_mean: array_like
array of the mean value of `data_arr`.
Only returned if `return_mean_std == True`
mark_std: array_like
array of the St. Dev. value of `data_arr`.
Only returned if `return_mean_std == True`
"""
file_msg = fd.Program_Msg(__file__)
## Checking input variables
# `data_arr`
data_arr_valid_types = (np.ndarray, list)
if not (isinstance(data_arr, data_arr_valid_types)):
msg = '{0} `data_arr` ({1}) is not a valid type!'.format(
file_msg, type(data_arr))
raise LSSUtils_Error(msg)
else:
data_arr = np.asarray(data_arr)
# `type_sigma`
type_sigma_valid = ['perc', 'std']
if not (isinstance(type_sigma, str)):
msg = '{0} `type_sigma` ({1}) is not a valid type!'.format(
file_msg, type(type_sigma))
raise LSSUtils_Error(msg)
if not (type_sigma in type_sigma_valid):
msg = '{0} `type_sigma` ({1}) is not a valid input choice!'.format(
file_msg, type_sigma)
## Determining if the object is a list of list or not
data_arr_type = [[] for x in range(len(data_arr))]
arr_type = (np.ndarray, list)
# Checking type of input parameter
for zz, data_zz in enumerate(data_arr):
data_arr_type[zz] = isinstance(data_zz, arr_type)
# Choosing axis
if (all([(zz == True) for zz in data_arr_type])):
# If `data_arr` is a list of lists
list_opt = True
# Checking data dimension
if (data_arr.ndim == 1):
# Array of multiple elements in each bin
ax_opt = True
else:
# Same number of elements in each bin
ax_opt = False
else:
# If `data_arr` is a 1D array
list_opt = False
# Checking data dimension
if (data_arr.ndim == 1):
ax_opt = False
else:
msg = '{0} Invalid input of `data_arr`'.format(file_msg)
raise TypeError(msg)
# Determining `mean` and `standard deviation`
if list_opt:
# If different number of elements in each bin
if ax_opt:
# Number of bins in `data_arr`
nbins = len(data_arr)
# Calculating statistics
mark_mean = np.zeros(nbins) * np.nan
mark_std = np.zeros(nbins) * np.nan
# Calculating `mean` and `stdev` for each bin
for ii, data_ii in enumerate(data_arr):
mark_mean[ii] = np.nanmean(data_ii, axis=0)
mark_std [ii] = np.nanstd( data_ii, axis=0)
else:
# Calculating `mean` and `stdev` for each bin
mark_mean = np.nanmean(data_arr, axis=1)
mark_std = np.nanstd( data_arr, axis=1)
else:
# When dealing with a 1D array
mark_mean = np.nanmean(data_arr, axis=0)
mark_std = np.nanstd( data_arr, axis=0)
#
## Determining errors in each bin
# Creating dictionary for saving `sigma`s
sigma_dict = {ii: [] for ii in range(len(perc_arr))}
# Using percentiles to estimate errors
if (type_sigma == 'perc'):
# If `data_arr` is a list
if list_opt:
# If different number of elements in each bin
if ax_opt:
# Number of bins in `data_arr`
nbins = len(data_arr)
# Looping over sigma values
for zz, perc_zz in enumerate(perc_arr):
# Defining lower and upper ranges
mark_lims = np.zeros((nbins, 2)) * np.nan
# Populating lower and upper limits
for ii, data_ii in enumerate(data_arr):
perc_lims = [50 - (perc_zz/2.), 50 + (perc_zz/2.)]
mark_lower = np.nanpercentile(data_ii, perc_lims[0])
mark_upper = np.nanpercentile(data_ii, perc_lims[1])
# Saving to `mark_lims`
mark_lims[ii] = [mark_lower, mark_upper]
# Saving to `sigma_dict`
sigma_dict[zz] = mark_lims.T
else:
# If same number of elements in each bin
# Looping over sigma values
for zz, perc_zz in enumerate(perc_arr):
perc_lims = [50 - (perc_zz/2.), 50 + (perc_zz/2.)]
mark_lower = np.nanpercentile(data_arr, perc_lims[0], axis=1)
mark_upper = np.nanpercentile(data_arr, perc_lims[1], axis=1)
mark_lims = np.column_stack((mark_lower, mark_upper))
# Saving to dictionary
sigma_dict[zz] = mark_lims.T
else:
# Looping over sigma's
for zz, perc_zz in enumerate(perc_arr):
perc_lims = [50 - (perc_zz/2.), 50 + (perc_zz/2.)]
mark_lower = np.nanpercentile(data_arr, perc_lims[0], axis=0)
mark_upper = np.nanpercentile(data_arr, perc_lims[1], axis=0)
mark_lims = np.column_stack((mark_lower, mark_upper))[0]
# Saving to dictionary
sigma_dict[zz] = mark_lims
# Using standard deviation to estimate errors
if (type_sigma == 'std'):
# Number of bins in `perc_arr`
nperc = len(perc_arr)
# Looping over St. Dev. ranges
for zz in range(nperc):
mark_lower = mark_mean - (mark_std * (zz + 1))
mark_upper = mark_mean + (mark_std * (zz + 1))
# Populating lower and upper limits
sigma_dict[zz] = np.column_stack((mark_lower, mark_upper)).T
#
# Fixing values for when it's a 1D array
if (not list_opt) and (not ax_opt):
for zz in range(len(sigma_dict.keys())):
sigma_dict[zz] = sigma_dict[zz].flatten()
#
# Deciding which objects to return
if return_mean_std:
return_obj = sigma_dict, mark_mean, mark_std
else:
return_obj = sigma_dict
return return_obj
## Main framework for `Stats_one_arr` and `Stats_two_arr`
[docs]def Stats_one_arr(x, y, base=1., arr_len=0, arr_digit='n',
statfunc=np.nanmean, bin_statval='average',
return_perc=False, failval=np.nan, type_sigma='std', return_dict=False):
"""
Calculates statistics for 2 arrays
Parameters
----------
x, y : array_like, shape(N,)
Sets of elements for the 1st and 2nd observable
base : float, optional
Bin width in units of `x`. This variable is set to 1. by default.
arr_len : int, optional
Minimum number of elements in each bin of `x`
arr_digit : {'n', 'y', 'o'} str, optional
Option for which elements to return.
Options:
- 'n' : Returns `x_stat`, `y_stat`, `y_std`, `y_std_err`
- 'y' : Returns `x_stat`, `y_stat`, `y_std`, `y_std_err`, `x_bins_data`, `y_bins_data`
- 'o' : Returns `x_bins_data`, `y_bins_data`
statfunc : {`numpy.nanmean`, `numpy.nanmedian`} statistical func, optional
Numerical function used to calculate on bins of data.
By default, this variable is set to `numpy.nanmean`
bin_statval : {'average', 'left', 'right', 'center'} str, optional
Option for where to put the bin values of `x` and `y`.
By default, this variable is set to `average`, which means
that the values are those of the averages of the bins in `x` and
`y`.
return_perc : `bool`, optional
If true, it also returns the `percentiles` of the data.
Last item in the return list.
This variable is set to False by default.
failval : `int`, `float`, `NoneType`, or `NaN`, optional
This is the value used when no data is available for the bin.
This is set to `numpy.nan` by default
type_sigma : {'perc', 'std'} string, optional (default = 'std')
Option for calculating either `percentiles` or `standard deviations`.
This variable is set to `std` by default.
Options:
- ``perc`` : calculates percentiles
- ``std`` : uses standard deviations as 1-, 2-, and 3-sigmas
return_dict : `bool`, optional
If `True`, the function returns a `dict` with the binned statistics
and more. This variable is set to `False` by default.
Returns
----------
x_stat, y_stat : array_like
Binned array of elements from `x`
y_std : array_like
Standard deviation of the binned array in `x`
y_std_err : array_like
Error in the `statfunc` of `y`
x_bins_data : array_like, optional
Elements of `x` in each bin with spacing of `base`.
Only returned if `arr_digit` == 'y' or 'o'
y_bins_data : array_like, optional
Elements of `y` in each bin with spacing of `base`.
Only returned if `arr_digit` == 'y' or 'o'
perc_lims : array_like, shape(N,3)
Percentiles in each bin of `x_stat`.
Only returned if `arr_digit` == 'y' or 'o'
"""
file_msg = fd.Program_Msg(__file__)
## Verifying input values
# `arr_digit`
if not ((arr_digit == 'y') or (arr_digit == 'n') or (arr_digit == 'o')):
msg = '{0} `arr_digit` ({1}) is not a valid input. Exiting'.format(
file_msg, arr_digit)
raise LSSUtils_Error(msg)
# Array dimensions
if not ((len(x) > 0) and (len(y) > 0)):
msg = '{0} The arrays `x` and `y` must have at least one value'
msg = msg.format(file_msg)
raise LSSUtils_Error(msg)
if not ((np.asarray(x).ndim == 1) and (np.asarray(y).ndim == 1)):
msg = '{0} The arrays `x` and `y` must have dimension of `1`'
msg = msg.format(file_msg)
raise LSSUtils_Error(msg)
# `arr_len`
if not (arr_len >= 0):
msg = '{0} `arr_len` ({1}) must be greater or equal than zero!'.format(
file_msg, arr_len)
raise LSSUtils_Error(msg)
# `bin_statval`
if not (bin_statval in ['average', 'left', 'right', 'center']):
msg = '{0} `bin_statval` ({1}) is not a valid input! Exiting'.format(
file_msg, bin_statval)
raise LSSUtils_Error(msg)
##
## Converting arrays to numpy arrays
x = np.asarray(x)
y = np.asarray(y)
arr_len = int(arr_len - 1.) if arr_len != 0 else int(arr_len)
##
## Statistics calculations
x_bins_edges = Bins_array_create(x, base=base)
x_digits = np.digitize(x, x_bins_edges) - 1
# Tuple for `x_bins`
x_bins = Bins_array_create(x, base=base, return_tuple=True)
nbins = len(x_bins)
##
## Determining which bins to use
## These are the bins that meet the criteria of `arr_len`
x_digits_bins = np.array([int(ii) for ii in range(nbins) if
len(x_digits[x_digits == ii]) > arr_len])
# Running only if there is data
if (len(x_digits_bins) > 0):
# Elements in each bin
# `x` values
x_bins_data = np.array([x[x_digits == ii] for ii in x_digits_bins])
# `y` values
y_bins_data = np.array([y[x_digits == ii] for ii in x_digits_bins])
# Bins that meet the `arr_len` criteria
x_bins_criteria = x_bins[x_digits_bins]
## Selecting data in bins
if (bin_statval == 'left'):
x_stat = x_bins_criteria.T[0]
elif (bin_statval == 'right'):
x_stat = x_bins_criteria.T[1]
elif (bin_statval == 'center'):
x_stat = np.mean(x_bins_criteria, axis=1)
elif (bin_statval == 'average'):
x_stat = np.array([np.nanmean(ii) if (len(ii) > arr_len)
else failval for ii in x_bins_data])
# Determining the values in `y`
# `stat_function`
y_stat = np.array([statfunc(ii) for ii in y_bins_data])
# Standard Deviation
y_std = np.array([np.nanstd(ii) for ii in y_bins_data])
# Error in the mean/median
y_std_err = np.array([np.nanstd(ii)/math.sqrt(len(ii)) for ii in
y_bins_data])
else:
x_bins_data = np.array([np.nan])
y_bins_data = np.array([np.nan])
x_bins_criteria = np.array([np.nan])
x_stat = np.array([np.nan])
y_stat = np.array([np.nan])
y_std = np.array([np.nan])
y_std_err = np.array([np.nan])
##
## Correcting error inf `statfunc` == `numpy.nanmedian`
if statfunc == np.nanmedian:
y_std_err *= 1.253
##
## Returning percentiles
perc_arr_lims = sigma_calcs(y_bins_data, type_sigma=type_sigma)
##
# Building dictionary
xy_dict = {}
xy_dict['x_stat' ] = x_stat
xy_dict['y_stat' ] = y_stat
xy_dict['y_std' ] = y_std
xy_dict['y_std_err' ] = y_std_err
xy_dict['perc_arr_lims'] = perc_arr_lims
xy_dict['x_bins_data' ] = x_bins_data
xy_dict['y_bins_data' ] = y_bins_data
# Determine entries to return
if (arr_digit == 'n'):
return_val = [ 'x_stat', 'y_stat', 'y_std', 'y_std_err']
if (arr_digit == 'y'):
return_val = [ 'x_stat', 'y_stat', 'y_std', 'y_std_err',
'x_bins_data', 'y_bins_data']
if (arr_digit == 'o'):
return_val = [ 'x_bins_data', 'y_bins_data']
# Aggregating percentage/std info if necessary
if return_perc:
return_val.append('perc_arr_lims')
# Determining object to return
if return_dict:
# Initiating dictionary with output values
return_obj = {}
# Populating output object
for key_val in return_val:
return_obj[key_val] = xy_dict[key_val]
else:
# Initiating output object
return_obj = [[] for x in range(len(return_val))]
# Populating output object
for jj, key_val in enumerate(return_val):
return_obj[jj] = xy_dict[key_val]
return return_obj