# Source code for cosmo_utils.utils.stats_funcs

#! /usr/bin/env python
# -*- coding: utf-8 -*-

# Victor Calderon
# Created      : 2018-04-28
from __future__ import absolute_import, division, print_function
__author__     = ['Victor Calderon']
__email__      = ['victor.calderon@vanderbilt.edu']
__maintainer__ = ['Victor Calderon']
__all__        = [  "myceil",
"myfloor",
"Bins_array_create",
"sigma_calcs",
"Stats_one_arr"]
"""
Set of statistical functions
"""

## Import modules
import math
import numpy as np
from   cosmo_utils.utils             import file_utils as fd
from   cosmo_utils.custom_exceptions import LSSUtils_Error

## Functions

# Upper-bound values
[docs]def myceil(x, base=10):
"""
Determines the upper-bound interger for a given number with a given base.

Parameters
----------
x : float
Number to be approximated to closest number to base

base : float
Base used to calculate the closest largest number

Returns
----------
y : float
Closest float number to x, i.e. upper-bound float

Examples
----------
>>> myceil(12, 10)
20.0

>>> myceil(12.05, 1.)
13.0

>>> myceil(12.05, 0.5)
12.5
"""
y = float(base * math.ceil(float(x)/base))

return y

## Lower-bound values
[docs]def myfloor(x, base=10):
"""
Determines the lower-bound interger for a given number with a given base.

Parameters
----------
x : float
Number to be approximated to closest number to base

base : float
Base used to calculate the closest largest number

Returns
----------
y : float
Closest float number to x, i.e. upper-bound float

Examples
----------
>>> myfloor(12, 10)
10.0

>>> myfloor(12.05, 1.)
12.0

>>> myfloor(12.05, 0.2)
12.0
"""
y = float(base * math.floor(float(x)/base))

return y

## Generation of bins evenly spaced out
[docs]def Bins_array_create(arr, base=10, return_tuple=False):
"""
Generates an evenly-spaced array between the minimum and maximum value
of a given array,

Parameters
----------
arr : array_like
Array of of numbers or floats

base : int or float, optional
Interval used to create the evenly-spaced array of elements

return_tuple : bool, optional
If True, the function returns a set of tuples for each bin. This
variable  is set to False by default.

Returns
----------
bins_arr : numpy.ndarray
Array of elements separated in intervals of base
"""
file_msg = fd.Program_Msg(__file__)
# Transforming input data
base = float(base)
arr = np.asarray(arr)
# Checking array dimensions
if arr.ndim != 1:
msg = '{0} The input array is not of dimension 1, but of {1}'.format(
file_msg, arr.ndim)
raise LSSUtils_Error(msg)
# Creating evenly-spaced array
arr_min  = myfloor(arr.min(), base=base)
arr_max  = myceil(arr.max(), base=base)
bins_arr = np.arange(arr_min, arr_max + 0.5*base, base)
# Creating tuple if necessary
if return_tuple:
bins_arr_mod = (np.array([[bins_arr[ii], bins_arr[ii+1]]
for ii in range(len(bins_arr) - 1)]))
return_obj = bins_arr_mod
else:
return_obj = bins_arr

return return_obj

## Calculations of percentiles and sigmas
[docs]def sigma_calcs(data_arr, type_sigma='std', perc_arr=[68., 95., 99.7],
return_mean_std=False):
"""
Calcualates the 1-, 2-, and 3-sigma ranges for data_arr

Parameters
-----------
data_arr : numpy.ndarray, shape( param_dict['nrpbins'], param_dict['itern_tot'])
array of values, from which to calculate percentiles or St. Dev.

type_sigma : {'perc', 'std'} string, optional (default = 'std')
Option for calculating either percentiles or standard deviations
- 'perc': calculates percentiles
- 'std' : uses standard deviations as 1-, 2-, and 3-sigmas

perc_arr : array_like, optional (default = [68., 95., 99.7])
Array of percentiles to calculate

return_mean_std : bool, optional (default = False)
Option for returning mean and St. Dev. along with sigma_dict

Return
----------
sigma_dict: python dicitionary
dictionary containg the 1-, 2-, and 3-sigma upper and lower
ranges for data-arr

mark_mean: array_like
array of the mean value of data_arr.
Only returned if return_mean_std == True

mark_std: array_like
array of the St. Dev. value of data_arr.
Only returned if return_mean_std == True
"""
file_msg = fd.Program_Msg(__file__)
## Checking input variables
# data_arr
data_arr_valid_types = (np.ndarray, list)
if not (isinstance(data_arr, data_arr_valid_types)):
msg = '{0} data_arr ({1}) is not a valid type!'.format(
file_msg, type(data_arr))
raise LSSUtils_Error(msg)
else:
data_arr = np.asarray(data_arr)
# type_sigma
type_sigma_valid = ['perc', 'std']
if not (isinstance(type_sigma, str)):
msg = '{0} type_sigma ({1}) is not a valid type!'.format(
file_msg, type(type_sigma))
raise LSSUtils_Error(msg)
if not (type_sigma in type_sigma_valid):
msg = '{0} type_sigma ({1}) is not a valid input choice!'.format(
file_msg, type_sigma)
## Determining if the object is a list of list or not
data_arr_type = [[] for x in range(len(data_arr))]
arr_type      = (np.ndarray, list)
# Checking type of input parameter
for zz, data_zz in enumerate(data_arr):
data_arr_type[zz] = isinstance(data_zz, arr_type)
# Choosing axis
if (all([(zz == True) for zz in data_arr_type])):
# If data_arr is a list of lists
list_opt = True
# Checking data dimension
if (data_arr.ndim == 1):
# Array of multiple elements in each bin
ax_opt = True
else:
# Same number of elements in each bin
ax_opt = False
else:
# If data_arr is a 1D array
list_opt = False
# Checking data dimension
if (data_arr.ndim == 1):
ax_opt = False
else:
msg = '{0} Invalid input of data_arr'.format(file_msg)
raise TypeError(msg)
# Determining mean and standard deviation
if list_opt:
# If different number of elements in each bin
if ax_opt:
# Number of bins in data_arr
nbins = len(data_arr)
# Calculating statistics
mark_mean = np.zeros(nbins) * np.nan
mark_std  = np.zeros(nbins) * np.nan
# Calculating mean and stdev for each bin
for ii, data_ii in enumerate(data_arr):
mark_mean[ii] = np.nanmean(data_ii, axis=0)
mark_std [ii] = np.nanstd( data_ii, axis=0)
else:
# Calculating mean and stdev for each bin
mark_mean = np.nanmean(data_arr, axis=1)
mark_std  = np.nanstd( data_arr, axis=1)
else:
# When dealing with a 1D array
mark_mean = np.nanmean(data_arr, axis=0)
mark_std  = np.nanstd( data_arr, axis=0)
#
## Determining errors in each bin
# Creating dictionary for saving sigmas
sigma_dict = {ii: [] for ii in range(len(perc_arr))}
# Using percentiles to estimate errors
if (type_sigma == 'perc'):
# If data_arr is a list
if list_opt:
# If different number of elements in each bin
if ax_opt:
# Number of bins in data_arr
nbins = len(data_arr)
# Looping over sigma values
for zz, perc_zz in enumerate(perc_arr):
# Defining lower and upper ranges
mark_lims = np.zeros((nbins, 2)) * np.nan
# Populating lower and upper limits
for ii, data_ii in enumerate(data_arr):
perc_lims  = [50 - (perc_zz/2.), 50 + (perc_zz/2.)]
mark_lower = np.nanpercentile(data_ii, perc_lims[0])
mark_upper = np.nanpercentile(data_ii, perc_lims[1])
# Saving to mark_lims
mark_lims[ii] = [mark_lower, mark_upper]
# Saving to sigma_dict
sigma_dict[zz] = mark_lims.T
else:
# If same number of elements in each bin
# Looping over sigma values
for zz, perc_zz in enumerate(perc_arr):
perc_lims  = [50 - (perc_zz/2.), 50 + (perc_zz/2.)]
mark_lower = np.nanpercentile(data_arr, perc_lims[0], axis=1)
mark_upper = np.nanpercentile(data_arr, perc_lims[1], axis=1)
mark_lims  = np.column_stack((mark_lower, mark_upper))
# Saving to dictionary
sigma_dict[zz] = mark_lims.T
else:
# Looping over sigma's
for zz, perc_zz in enumerate(perc_arr):
perc_lims  = [50 - (perc_zz/2.), 50 + (perc_zz/2.)]
mark_lower = np.nanpercentile(data_arr, perc_lims[0], axis=0)
mark_upper = np.nanpercentile(data_arr, perc_lims[1], axis=0)
mark_lims  = np.column_stack((mark_lower, mark_upper))[0]
# Saving to dictionary
sigma_dict[zz] = mark_lims
# Using standard deviation to estimate errors
if (type_sigma == 'std'):
# Number of bins in perc_arr
nperc = len(perc_arr)
# Looping over St. Dev. ranges
for zz in range(nperc):
mark_lower = mark_mean - (mark_std * (zz + 1))
mark_upper = mark_mean + (mark_std * (zz + 1))
# Populating lower and upper limits
sigma_dict[zz] = np.column_stack((mark_lower, mark_upper)).T
#
# Fixing values for when it's a 1D array
if (not list_opt) and (not ax_opt):
for zz in range(len(sigma_dict.keys())):
sigma_dict[zz] = sigma_dict[zz].flatten()
#
# Deciding which objects to return
if return_mean_std:
return_obj = sigma_dict, mark_mean, mark_std
else:
return_obj = sigma_dict

return return_obj

## Main framework for Stats_one_arr and Stats_two_arr
[docs]def Stats_one_arr(x, y, base=1., arr_len=0, arr_digit='n',
statfunc=np.nanmean, bin_statval='average',
return_perc=False, failval=np.nan, type_sigma='std', return_dict=False):
"""
Calculates statistics for 2 arrays

Parameters
----------
x, y : array_like, shape(N,)
Sets of elements for the 1st and 2nd observable

base : float, optional
Bin width in units of x. This variable is set to 1. by default.

arr_len : int, optional
Minimum number of elements in each bin of x

arr_digit : {'n', 'y', 'o'} str, optional
Option for which elements to return.

Options:
- 'n' : Returns x_stat, y_stat, y_std, y_std_err
- 'y' : Returns x_stat, y_stat, y_std, y_std_err, x_bins_data, y_bins_data
- 'o' : Returns x_bins_data, y_bins_data

statfunc : {numpy.nanmean, numpy.nanmedian} statistical func, optional
Numerical function used to calculate on bins of data.
By default, this variable is set to numpy.nanmean

bin_statval : {'average', 'left', 'right', 'center'} str, optional
Option for where to put the bin values of x and y.
By default, this variable is set to average, which means
that the values are those of the averages of the bins in x and
y.

return_perc : bool, optional
If true, it also returns the percentiles of the data.
Last item in the return list.
This variable is set to False by default.

failval : int, float, NoneType, or NaN, optional
This is the value used when no data is available for the bin.
This is set to numpy.nan by default

type_sigma : {'perc', 'std'} string, optional (default = 'std')
Option for calculating either percentiles or standard deviations.
This variable is set to std by default.

Options:
- perc : calculates percentiles
- std : uses standard deviations as 1-, 2-, and 3-sigmas

return_dict : bool, optional
If True, the function returns a dict with the binned statistics
and more. This variable is set to False by default.

Returns
----------
x_stat, y_stat : array_like
Binned array of elements from x

y_std : array_like
Standard deviation of the binned array in x

y_std_err : array_like
Error in the statfunc of y

x_bins_data : array_like, optional
Elements of x in each bin with spacing of base.
Only returned if arr_digit == 'y' or 'o'

y_bins_data : array_like, optional
Elements of y in each bin with spacing of base.
Only returned if arr_digit == 'y' or 'o'

perc_lims : array_like, shape(N,3)
Percentiles in each bin of x_stat.
Only returned if arr_digit == 'y' or 'o'
"""
file_msg = fd.Program_Msg(__file__)
## Verifying input values
# arr_digit
if not ((arr_digit == 'y') or (arr_digit == 'n') or (arr_digit == 'o')):
msg = '{0} arr_digit ({1}) is not a valid input. Exiting'.format(
file_msg, arr_digit)
raise LSSUtils_Error(msg)
# Array dimensions
if not ((len(x) > 0) and (len(y) > 0)):
msg = '{0} The arrays x and y must have at least one value'
msg = msg.format(file_msg)
raise LSSUtils_Error(msg)
if not ((np.asarray(x).ndim == 1) and (np.asarray(y).ndim == 1)):
msg = '{0} The arrays x and y must have dimension of 1'
msg = msg.format(file_msg)
raise LSSUtils_Error(msg)
# arr_len
if not (arr_len >= 0):
msg = '{0} arr_len ({1}) must be greater or equal than zero!'.format(
file_msg, arr_len)
raise LSSUtils_Error(msg)
# bin_statval
if not (bin_statval in ['average', 'left', 'right', 'center']):
msg = '{0} bin_statval ({1}) is not a valid input! Exiting'.format(
file_msg, bin_statval)
raise LSSUtils_Error(msg)
##
## Converting arrays to numpy arrays
x       = np.asarray(x)
y       = np.asarray(y)
arr_len = int(arr_len - 1.) if arr_len != 0 else int(arr_len)
##
## Statistics calculations
x_bins_edges = Bins_array_create(x, base=base)
x_digits     = np.digitize(x, x_bins_edges) - 1
# Tuple for x_bins
x_bins = Bins_array_create(x, base=base, return_tuple=True)
nbins  = len(x_bins)
##
## Determining which bins to use
## These are the bins that meet the criteria of arr_len
x_digits_bins = np.array([int(ii) for ii in range(nbins) if
len(x_digits[x_digits == ii]) > arr_len])
# Running only if there is data
if (len(x_digits_bins) > 0):
# Elements in each bin
# x values
x_bins_data = np.array([x[x_digits == ii] for ii in x_digits_bins])
# y values
y_bins_data = np.array([y[x_digits == ii] for ii in x_digits_bins])
# Bins that meet the arr_len criteria
x_bins_criteria = x_bins[x_digits_bins]
## Selecting data in bins
if (bin_statval == 'left'):
x_stat = x_bins_criteria.T[0]
elif (bin_statval == 'right'):
x_stat = x_bins_criteria.T[1]
elif (bin_statval == 'center'):
x_stat = np.mean(x_bins_criteria, axis=1)
elif (bin_statval == 'average'):
x_stat = np.array([np.nanmean(ii) if (len(ii) > arr_len)
else failval for ii in x_bins_data])
# Determining the values in y
# stat_function
y_stat = np.array([statfunc(ii) for ii in y_bins_data])
# Standard Deviation
y_std  = np.array([np.nanstd(ii) for ii in y_bins_data])
# Error in the mean/median
y_std_err = np.array([np.nanstd(ii)/math.sqrt(len(ii)) for ii in
y_bins_data])
else:
x_bins_data     = np.array([np.nan])
y_bins_data     = np.array([np.nan])
x_bins_criteria = np.array([np.nan])
x_stat          = np.array([np.nan])
y_stat          = np.array([np.nan])
y_std           = np.array([np.nan])
y_std_err       = np.array([np.nan])
##
## Correcting error inf statfunc == numpy.nanmedian
if statfunc == np.nanmedian:
y_std_err *= 1.253
##
## Returning percentiles
perc_arr_lims = sigma_calcs(y_bins_data, type_sigma=type_sigma)
##
# Building dictionary
xy_dict                  = {}
xy_dict['x_stat'       ] = x_stat
xy_dict['y_stat'       ] = y_stat
xy_dict['y_std'        ] = y_std
xy_dict['y_std_err'    ] = y_std_err
xy_dict['perc_arr_lims'] = perc_arr_lims
xy_dict['x_bins_data'  ] = x_bins_data
xy_dict['y_bins_data'  ] = y_bins_data
# Determine entries to return
if (arr_digit == 'n'):
return_val = [  'x_stat', 'y_stat', 'y_std', 'y_std_err']
if (arr_digit == 'y'):
return_val = [  'x_stat', 'y_stat', 'y_std', 'y_std_err',
'x_bins_data', 'y_bins_data']
if (arr_digit == 'o'):
return_val = [  'x_bins_data', 'y_bins_data']
# Aggregating percentage/std info if necessary
if return_perc:
return_val.append('perc_arr_lims')
# Determining object to return
if return_dict:
# Initiating dictionary with output values
return_obj = {}
# Populating output object
for key_val in return_val:
return_obj[key_val] = xy_dict[key_val]
else:
# Initiating output object
return_obj = [[] for x in range(len(return_val))]
# Populating output object
for jj, key_val in enumerate(return_val):
return_obj[jj] = xy_dict[key_val]

return return_obj