"""module core.py
This module contains the base classes to define a random variable,
a probability function, or a parameter.
The module is also hosting global dictonaries used to keep track of
the different variables, parameters and probability functions declared.
"""
import copy
import logging
import math
import numpy as np
import operator
import scipy.interpolate
import scipy.optimize
import scipy.signal
import scipy.stats
__all__ = ['Param','PF','RV','logger','get_obj']
_dparams = {} # Dictionary hosting the list of parameters
_dpfs = {} # Dictionary hosting the list of probability functions
_drvs = {} # Dictionary hosting the list of random variables
# Logging system
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
_ch = logging.StreamHandler() # Console handler
_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
_ch.setFormatter(_formatter)
logger.addHandler(_ch)
logger.debug('logger has been set to DEBUG mode')
[docs]class Param(object):
"""Base class to define a PF shape parameter.
Two types of parameters can be built:
* RAW parameters do not depend on any other parameter and have
a value directly associated to them
* DERIVED parameters are obtained from other parameters via an
analytical formula.
Attributes
----------
name : str
Random Variable name
label : str
Random Variable name for printing purposes
value : float
Current numerical value
unc : float
Parameter uncertainty (e.g. after minimization)
bounds : list
Defines, if necessary, the lower and upper bounds
formula : list (optional, only for DERIVED parameters)
List of operators and parameters used to parse an analytic function
strform : str (optional, only for DERIVED parameters)
Representation of the formula as a string
partype : Param.RAW or Param.DERIVED
Tells whether it is a RAW or a DERIVED parameter
const : bool
Tells whether a parameter is fixed during a minimazation process.
It is not a constant in the sense of programming.
poi : bool
Tells whether a parameter is a parameter of interest in an
hypothesis test.
isuptodate : bool
Tells whether value needs to be computed again or not
logger : logging.Logger
message logging system
Examples
--------
Declarations::
>>> import statspy as sp
>>> mu = sp.Param(name="mu",value=10.,label="\\\\mu")
>>> x = sp.Param("x = 10. +- 2.")
>>> x.value
10.0
>>> x.unc
2.0
>>> y = sp.Param("y = 5.", unc=1.)
Operations, building DERIVED parameters::
>>> x + y
x + y = 15.0 +- 2.2360679775
>>> z = x * y
>>> z.name = 'z'
>>> z
z = x * y = 50.0 +- 14.1421356237
>>> x**2
x ** 2 = 100.0 +- 40.0
Possible operations are ``+,-,*,/,**``.
"""
# Define the different parameter types
(RAW,DERIVED) = (0,10)
# Define derivatives for main operators
_derivative_add_sub = [lambda x,y : 1, lambda x,y : 1]
_derivative_mul = [lambda x,y : y, lambda x,y : x]
_derivative_div = [lambda x,y : 1/y, lambda x,y : x/y/y]
def __init__(self, *args, **kwargs):
# Public class members
self.name = kwargs.get('name', None)
self.label = kwargs.get('label', self.name)
self.value = kwargs.get('value', 0.)
self.unc = kwargs.get('unc', 0.)
self.bounds = kwargs.get('bounds', [])
self.formula = kwargs.get('formula', None)
self.strform = kwargs.get('strform', None)
self.const = kwargs.get('const', False)
self.poi = kwargs.get('poi', False)
self.partype = Param.RAW
self.isuptodate = True
self.logger = logging.getLogger('statspy.core.Param')
# Internal class members
self._derivative = kwargs.get('derivative', None)
self._pcov = None
try:
self.logger.debug('args = %s, kwargs = %s',args,kwargs)
foundArgs = self._check_args_syntax(args)
if self.formula != None:
self.partype = Param.DERIVED
#Param._check_formula(self.formula) # <= TODO
Param._update_db(self, self)
self._evaluate()
if self.name != None:
self._register_in_db(self.name)
except:
raise
[docs] def __add__(self, other):
"""Add a parameter to another parameter or a numerical value.
Parameters
----------
self : Param
other : Param, int, long, float
Returns
-------
new : Param
new parameter which is the sum of self and other
"""
try:
new = Param(formula=[[operator.add, self, other]],
strform=Param._build_str_formula(self,'+',other))
except:
raise
return new
[docs] def __call__(self):
"""Return the parameter value.
Returns
-------
self.value : float
Parameter value possibly recomputed from self.formula
if self.isuptodate is False.
"""
return self.value
[docs] def __div__(self, other):
"""Divide a parameter by another parameter or by a numerical value.
Parameters
----------
self : Param
other : Param, int, long, float
Returns
-------
new : Param
new parameter which is the ratio of self and other
"""
try:
new = Param(formula=[[operator.div, self, other]],
strform=Param._build_str_formula(self,'/',other))
except:
raise
return new
[docs] def __getattribute__(self, name):
"""Overload __getattribute__ to update the value attribute from the
formula for a DERIVED parameter.
"""
try:
if (name == "value" and self.partype == Param.DERIVED and
(self.name == None or self.isuptodate == False)):
self._evaluate()
if (name == "unc" and self.partype == Param.DERIVED):
self._evaluate_unc()
except:
raise
return object.__getattribute__(self, name)
[docs] def __iadd__(self, other):
"""In-place addition (+=)
Parameters
----------
self : Param
other : Param, int, long, float
Returns
-------
self : Param
self parameter modified by other
"""
try:
if self.partype == Param.RAW:
self.partype = Param.DERIVED
self.isuptodate = False
self.formula = []
self.formula.append([operator.add, self, other])
self.strform = Param._build_str_formula(self,'+',other)
except:
raise
return self
[docs] def __imul__(self, other):
"""In-place multiplication (*=)
Parameters
----------
self : Param
other : Param, int, long, float
Returns
-------
self : Param
self parameter modified by other
"""
try:
if self.partype == Param.RAW:
self.partype = Param.DERIVED
self.isuptodate = False
self.formula = []
self.formula.append([operator.mul, self, other])
self.strform = Param._build_str_formula(self,'*',other)
except:
raise
return self
[docs] def __isub__(self, other):
"""In-place subtraction (-=)
Parameters
----------
self : Param
other : Param, int, long, float
Returns
-------
self : Param
self parameter modified by other
"""
try:
if self.partype == Param.RAW:
self.partype = Param.DERIVED
self.isuptodate = False
self.formula = []
self.formula.append([operator.sub, self, other])
self.strform = Param._build_str_formula(self,'-',other)
except:
raise
return self
[docs] def __mul__(self, other):
"""Multiply a parameter by another parameter or by a numerical value.
Parameters
----------
self : Param
other : Param, int, long, float
Returns
-------
new : Param
new parameter which is the product of self and other
"""
try:
new = Param(formula=[[operator.mul, self, other]],
strform=Param._build_str_formula(self,'*',other))
except:
raise
return new
[docs] def __pow__(self, other):
"""Raise a parameter the power.
Parameters
----------
self : Param
other : Param, int, long, float
Returns
-------
new : Param
new parameter which is self raised to the power of other
"""
try:
new = Param(formula=[[operator.pow, self, other]],
strform=Param._build_str_formula(self,'**',other))
except:
raise
return new
[docs] def __radd__(self, other):
"""Add a numerical value to a parameter"""
try:
new = Param(formula=[[operator.add, other, self]],
strform=Param._build_str_formula(other,'+',self))
except:
raise
return new
[docs] def __rdiv__(self, other):
"""Divide a numerical value by a parameter"""
try:
new = Param(formula=[[operator.div, other, self]],
strform=Param._build_str_formula(other,'/',self))
except:
raise
return new
[docs] def __repr__(self):
"""Return Parameter value and formula if DERIVED"""
theStr = self.name + " = " if self.name != None else ""
if self.strform != None: theStr = theStr + self.strform + " = "
theStr = theStr + str(self.value)
if self.unc != 0: theStr = theStr + " +- " + str(self.unc)
return theStr
[docs] def __rmul__(self, other):
"""Multiply a numerival value by a parameter"""
try:
new = Param(formula=[[operator.mul, other, self]],
strform=Param._build_str_formula(other,'*',self))
except:
raise
return new
[docs] def __rpow__(self, other):
"""Raise a numerical value to the power"""
try:
new = Param(formula=[[operator.pow, other, self]],
strform=Param._build_str_formula(other,'**',self))
except:
raise
return new
[docs] def __rsub__(self, other):
"""Subtract a numerical value to a parameter"""
try:
new = Param(formula=[[operator.sub, other, self]],
strform=Param._build_str_formula(other,'-',self))
except:
raise
return new
[docs] def __setattr__(self, name, value):
"""Overload __setattr__ to make sure that quantities based on this
parameter will be updated when its value is modified.
"""
if "value" in self.__dict__ and name == "value":
if hasattr(self, "name") and self.name != None:
update_status(_dparams[self.name])
if name == "name" and hasattr(self, "name"):
if self.name != None:
raise AttributeError('Cannot overwrite parameter name')
self._register_in_db(value)
super(Param, self).__setattr__(name, value)
[docs] def __sub__(self, other):
"""Subtract a parameter to another parameter or a numerical value.
Parameters
----------
self : Param
other : Param, int, long, float
Returns
-------
new : Param
new parameter which is the difference of self and other
"""
try:
new = Param(formula=[[operator.sub, self, other]],
strform=Param._build_str_formula(self,'-',other))
except:
raise
return new
[docs] def get_raw_params(self):
"""Get the list of RAW parameters from a DERIVED parameter"""
raw_params = []
if self.partype != Param.DERIVED: return raw_params
if self.formula == None or type(self.formula) != list:
return raw_params
for op in self.formula:
for ele in op:
if not isinstance(ele, Param): continue
if ele.partype == Param.RAW:
raw_params.append(ele)
elif ele.partype == Param.DERIVED:
raw_params += ele.get_raw_params()
return raw_params
[docs] def unbound_repr(self):
"""Transform the value of the parameter such as it has no bounds.
This method is used with the minimization algorithms which require
only unbound parameters. The transformation formula from a
double-sided or a one-sided parameter to an unbound parameter are
described in Section 1.3.1 of the MINUIT manual:
Fred James, Matthias Winkler, MINUIT User's Guide, June 16, 2004.
Returns
-------
new : float
parameter value within an unbound representation.
"""
new = self.value
if len(self.bounds) != 2: return new # Parameter has no bounds
a = self.bounds[0]
b = self.bounds[1]
if a != None and b != None:
# Parameter with double-sided limits
new = math.asin(2 * (new - a) / (b - a) - 1)
elif a != None:
# Parameter with a lower bound
new = math.sqrt((new - a + 1) * (new - a + 1) - 1)
elif b != None:
# Parameter with an upper bound
new = math.sqrt((b - new + 1) * (b - new + 1) - 1)
return new
[docs] def unbound_to_bound(self, val, unc = 0.):
"""Transform the parameter value from an unbound to a bounded
representation.
This method is used with the minimization algorithms which require
only unbound parameters. The transformation formula from an unbound
parameter to a double-sided or a one-sided parameter are described in
Section 1.3.1 of the MINUIT manual:
Fred James, Matthias Winkler, MINUIT User's Guide, June 16, 2004.
Since the transformation is non-linear, the transformation of the
uncertainty is approximate and based on the error propagation
formula. In particular, when the value is close to its limit, the
uncertainty is not trustable and a more refined analysis should be
performed.
Parameters
----------
val : float
parameter value within an unbound representation
unc : float
parameter uncertainty within an unbound representation
"""
if len(self.bounds) != 2:
self.value = val
self.unc = unc
return self.value, self.unc # Parameter has no bounds
a = self.bounds[0]
b = self.bounds[1]
if a != None and b != None:
# Parameter with double-sided limits
self.value = a + (b - a) / 2 * (math.sin(val) + 1)
self.unc = math.fabs((b - a) / 2 * math.cos(val) * unc)
elif a != None:
# Parameter with a lower bound
self.value = a - 1 + math.sqrt(val * val + 1)
self.unc = val / math.sqrt(val * val + 1) * unc
elif b != None:
# Parameter with an upper bound
self.value = b + 1 - math.sqrt(val * val + 1)
self.unc = val / math.sqrt(val * val + 1) * unc
return self.value, self.unc
def _check_args_syntax(self, args):
if not len(args): return False
if not isinstance(args[0],str):
raise SyntaxError("If an argument is passed to Param without a keyword, it must be a string.")
# Analyse the string
theStr = args[0]
if '=' in theStr:
self.label = self.name = theStr.split('=')[0].strip().lstrip()
self.logger.debug("Found Param name %s", self.name)
theStr = theStr.split('=')[1]
if '+-' in theStr:
self.unc = float(theStr.split('+-')[1].strip().lstrip())
theStr = theStr.split('+-')[0]
self.value = float(theStr.strip().lstrip())
return True
def _evaluate(self):
if self.partype != Param.DERIVED: return
if type(self.formula) != list: return
value = 0.
for op in self.formula:
if isinstance(op, list):
if len(op) == 3:
if isinstance(op[1], Param):
val1 = value if op[1] == self else op[1].value
else:
val1 = op[1]
val2 = op[2].value if isinstance(op[2], Param) else op[2]
value = op[0](val1, val2)
elif len(op) == 2 and isinstance(op[1], Param):
if op[1] == self:
value = op[0](value)
else:
value = op[0](op[1].value)
elif len(op) == 1 and isinstance(op[0], Param):
value = op[0].value
else:
raise TypeError('operation is not recognized')
elif isinstance(op, Param):
value = op.value
else:
raise TypeError('operation type is not recognized')
self.value = value
self.isuptodate = True
return
def _evaluate_unc(self):
"""Propagation of uncertainty."""
if self.partype != Param.DERIVED: return
if type(self.formula) != list: return
value = 0.
unc = 0.
for op in self.formula:
if isinstance(op, list):
if len(op) == 3:
# operators +,-,*,/,**
if op[0] == operator.mul:
derivative = [lambda x,y : y, lambda x,y : x]
elif op[0] == operator.div:
derivative = [lambda x,y : 1/y, lambda x,y : x/y/y]
elif op[0] == operator.pow:
derivative = [lambda x,y : y*(x**(y-1)),
lambda x,y : math.log(x)*math.exp(y) ]
else:
derivative = [lambda x,y : 1, lambda x,y : 1]
corr = 0.
if isinstance(op[1], Param):
val1 = value if op[1] == self else op[1].value
unc1 = unc if op[1] == self else op[1].unc
if type(op[1]._pcov) == list and isinstance(op[2], Param):
for par in op[1]._pcov:
if par[0] == op[2]: corr = par[1]
else:
val1 = op[1]
unc1 = 0.
val2 = op[2].value if isinstance(op[2], Param) else op[2]
unc2 = op[2].unc if isinstance(op[2], Param) else 0.
df1 = (derivative[0])(val1, val2)
df2 = (derivative[1])(val1, val2)
value = op[0](val1, val2)
unc = df1 * df1 * unc1 * unc1 + df2 * df2 * unc2 * unc2
if corr != 0.: unc += df1 * df2 * corr * unc1 * unc2
if unc < 0.: raise SyntaxError('Negative uncertainty.')
unc = math.sqrt(unc)
elif len(op) == 2 and isinstance(op[1], Param):
# mathematical function
if op[1] == self:
value = op[0](value)
else:
value = op[0](op[1].value)
if self._derivative != None:
df = self._derivative(op[1].value)
unc = df * df * op[1].unc * op[1].unc
if unc < 0.: raise SyntaxError('Negative uncertainty.')
unc = math.sqrt(unc)
elif len(op) == 1 and isinstance(op[0], Param):
# simple copy
value = op[0].value
unc = op[0].unc
else:
raise TypeError('operation is not recognized')
elif isinstance(op, Param):
value = op.value
unc = op.unc
else:
raise TypeError('operation type is not recognized')
self.value = value
self.unc = unc
self.isuptodate = True
return
def _register_in_db(self, name):
"""Register a parameter in database."""
if name in _dparams and 'obj' in _dparams[name]:
self.logger.warning('%s already registred, remove existing info',
name)
_dparams[name] = {'rvs':[],'pfs':[],'params':[]}
_dparams[name]['obj'] = self
self.logger.debug('Register new Param: %s with value=%f, bounds=%s',
name,self.value,self.bounds)
if self.formula != None:
self.logger.debug('and derived from formula: %s', self.formula)
return
@staticmethod
def _update_db(par, par_to_add):
"""Add par_to_add to the list of params in database"""
if par.partype == Param.RAW:
if par.name != None:
_dparams[par.name]['params'].append(par_to_add)
return
if not isinstance(par.formula, list) or not len(par.formula): return
for op in par.formula:
if isinstance(op, list):
for ele in op:
if not isinstance(ele, Param): continue
if ele.name == None:
if ele.partype == Param.DERIVED:
Param._update_db(ele, par_to_add)
else:
_dparams[ele.name]['params'].append(par_to_add)
elif isinstance(op, Param):
if op.name == None:
if op.partype == Param.DERIVED:
Param._update_db(op, par_to_add)
else:
_dparams[op.name]['params'].append(par_to_add)
else:
raise TypeError('operation is not recognized')
return
@staticmethod
def _get_strform(par):
strform = None
if isinstance(par, (int,long,float)): return str(par)
if par.name != None:
strform = par.name
elif par.strform != None:
strform = par.strform
return strform
@staticmethod
def _build_str_formula(par1, op, par2):
strform = None
par1_strform = Param._get_strform(par1)
par2_strform = Param._get_strform(par2)
if par1_strform != None and par2_strform != None:
if op != '+' and op != '-':
if isinstance(par1, Param) and par1.name == None:
par1_strform = '(' + par1_strform + ')'
if isinstance(par2, Param) and par2.name == None:
par2_strform = '(' + par2_strform + ')'
strform = '%s %s %s' % (par1_strform, op, par2_strform)
return strform
[docs]class PF(object):
"""Base class to define a Probability Function.
Probability Function is a generic name which includes both the
probability mass function for discrete random variables and the
probability density fucntion for continuous random variables.
The function itself is defined in ``self.func``:
* For a RAW PF, it is a pdf or a pmf of scipy.stats.
* For a DERIVED PF, it is list containing an operator and
pointers to other PFs.
Attributes
----------
name : str (optional)
Function name
func : scipy.stats.distributions.rv_generic, list
Probability Density Function object.
params : statspy.core.Param list
List of shape parameters used to define the pf
norm : Param
Normalization parameter set to 1 by default. It can be different
from 1 when the PF is fitted to data.
lolc : Param, optional
Location parameter
scalle : Param, optional
Scale parameter
isuptodate : bool
Tells whether PF needs to be normalized or not
options : dict
Potential list of options
pftype : PF.RAW or PF.DERIVED
Tells whether a PF is a RAW PF or DERIVED from other PFs
logger : logging.Logger
message logging system
Examples
--------
>>> import statspy as sp
>>> pmf_n = sp.PF("pmf_n=poisson(n;mu)",mu=10.)
"""
# Define the different probability function types
(RAW,DERIVED) = (0,10)
def __init__(self,*args,**kwargs):
# Public class members
self.name = None
self.func = None
self.params = []
self.norm = Param(value=1., const=True)
self.loc = None
self.scale = None
self.isuptodate = False
self.options = kwargs.get('options', {})
self.pftype = PF.RAW
self.logger = logging.getLogger('statspy.core.PF')
# Internal class members
self._cache = None
self._free_params = []
self._pcov = None
self._rvs = []
self._spline_pf = None
self._spline_cdf = None
try:
self.logger.debug('args = %s, kwargs = %s',args,kwargs)
foundArgs = self._check_args_syntax(args)
self._check_kwargs_syntax(kwargs,foundArgs)
if isinstance(self.func, scipy.stats.distributions.rv_generic):
self.isuptodate = True
self._rvs[0][1] = self.func.a # Lower bound
self._rvs[0][2] = self.func.b # Upper bound
if isinstance(self.func, scipy.stats.rv_discrete):
self._rvs[0][3] = RV.DISCRETE
elif isinstance(self.func, scipy.stats.rv_continuous):
self._rvs[0][3] = RV.CONTINUOUS
except:
raise
[docs] def __add__(self,other):
"""Add two PFs.
The norm parameters are also summed.
Parameters
----------
self : PF
other : PF
Returns
-------
new : PF
new pf which is the sum of self and other
"""
try:
norm0 = [self.norm.value, other.norm.value]
self_params = self._get_add_norm_params()
other_params = other._get_add_norm_params()
add_params = self_params + other_params
frac0 = 1./len(add_params)
first = True
for ipar,add_param in enumerate(add_params):
if not ipar: continue
if add_param.norm.partype == Param.RAW:
if add_param.norm.value >= 1.:
add_param.norm.value = frac0
add_param.norm.const = False
add_param.norm.bounds = [0., 1.]
else:
add_param.norm = Param(value=frac0, bounds=[0., 1.],
const=False)
if add_param.name != None:
add_param.name = 'norm_%s' % add_param.name
if first:
add_params[0].norm = 1. - add_param.norm
first = False
else:
add_params[0].norm -= add_param.norm
if add_params[0].name != None:
add_params[0].norm.name = 'norm_%s' % add_params[0].name
for ele in [self_params, other_params]:
if len(ele) < 2: continue
first = True
for ipar,add_param in enumerate(ele):
if ipar == 0: continue
if first:
ele.norm = ele[0].norm + add_param.norm
first = False
else:
ele.norm += add_param.norm
if ele.name != None:
ele.norm.name = 'norm_%s' % ele.name
new = PF(func=[operator.add, self, other])
if norm0[0] != 1 or norm0[1] != 1:
new.norm.value = norm0[0] + norm0[1]
except:
raise
return new
[docs] def __call__(self, *args, **kwargs):
"""Evaluate Probability Function in x
Parameters
----------
args : float, ndarray, optional, multiple values for multivariate pfs
Random Variable(s) value(s)
kwargs : keywork arguments, optional
Shape parameters values
Returns
-------
value : float, ndarray
Probability Function value(s) in x
"""
# Check if self.func contains a pdf() or a pmf() method
if self.pftype == PF.RAW:
method_name = 'pdf'
try:
if isinstance(self.func, scipy.stats.rv_discrete):
method_name = 'pmf'
check_method_exists(obj=self.func,name=method_name)
except:
raise
# Get random variable value(s), mandatory
rv_values = self._get_rv_values(*args, **kwargs)
# Get shape parameters, optional
param_values = self._get_param_values(**kwargs)
# Compute pf value in x
value = None
# - in case of a DERIVED PF, call an operator
if (self.pftype == PF.DERIVED and isinstance(self.func, list) and
len(self.func) == 1 and isinstance(self.func[0], PF)):
value = self.func[0](*args, **kwargs)
elif self.pftype == PF.DERIVED:
if not isinstance(self.func, list) or len(self.func) != 3:
raise SyntaxError('DERIVED function is not recognized.')
op = self.func[0] # operator
if type(op) == str:
redocache = not self.isuptodate or self._spline_pf == None
for name,value in zip(self.rv_names(), rv_values):
if (np.amin(value) < np.amin(self._cache[name]) or
np.amax(value) > np.amax(self._cache[name])):
redocache = True
if redocache: self._build_cache(*rv_values)
value = self._spline_pf(*rv_values)
else:
vals = [0.,0.]
for idx in [1,2]:
the_args = []
for irv,rv in enumerate(self._rvs):
if rv[0] in self.func[idx].rv_names():
the_args.append(rv_values[irv])
vals[idx-1] = self.func[idx](*the_args, **kwargs)
value = op(vals[0],vals[1])
else:
# - in case of a RAW PF, call directly the scipy function
if method_name == 'pmf':
value = self.func.pmf(rv_values[0], *param_values)
else:
value = self.func.pdf(rv_values[0], *param_values)
if self.scale != None and self.scale.value != 0.:
value = value / self.scale.value
return self.norm.value * value
[docs] def __mul__(self, other):
"""Multiply a PF by another PF.
parameters
----------
self : PF
other : PF
returns
-------
new : PF
new PF which is the product of self and other
"""
try:
new = PF(func=[operator.mul, self, other])
except:
raise
return new
[docs] def __rmul__(self, scale):
"""Scale PF normalization value
parameters
----------
self : PF
scale : float
returns
-------
self : PF
original PF with self.norm.value *= scale
"""
try:
self.norm.value *= scale
except:
raise
return self
[docs] def cdf(self, *args, **kwargs):
"""Compute the cumulative distribution function in x.
Parameters
----------
args : ndarray, tuple
Random Variable(s) value(s)
kwargs : keywork arguments, optional
Shape parameters values
Returns
-------
value : float, ndarray
Cumulative distribution function value(s) in x
"""
# Check if self.func contains a cdf() method
if self.pftype == PF.RAW:
try:
check_method_exists(obj=self.func,name='cdf')
except:
raise
# Get random variable value(s), mandatory
rv_values = self._get_rv_values(*args, **kwargs)
# Get shape parameters, optional
param_values = self._get_param_values(**kwargs)
# Compute cdf value in x
# - in case of a DERIVED PF, call an operator
if (self.pftype == PF.DERIVED and isinstance(self.func, list) and
len(self.func) == 1 and isinstance(self.func[0], PF)):
value = self.norm.value * self.func[0].cdf(*args, **kwargs)
return value
elif self.pftype == PF.DERIVED:
if not isinstance(self.func, list) or len(self.func) != 3:
raise SyntaxError('DERIVED function is not recognized.')
op = self.func[0]
if type(op) == str:
redocache = not self.isuptodate or self._spline_cdf == None
for name,value in zip(self.rv_names(), rv_values):
if (np.amin(value) < np.amin(self._cache[name]) or
np.amax(value) > np.amax(self._cache[name])):
redocache = True
if redocache: self._build_cache(*rv_values)
value = self.norm.value * self._spline_cdf(*rv_values)
else:
vals = [0.,0.]
for idx in [1,2]:
the_args = []
for irv,rv in enumerate(self._rvs):
if rv[0] in self.func[idx].rv_names():
the_args.append(rv_values[irv])
vals[idx-1] = self.func[idx].cdf(*the_args, **kwargs)
value = self.norm.value * op(vals[0],vals[1])
return value
# - in case of a RAW PF, call directly the scipy function
return self.norm.value * self.func.cdf(rv_values[0], *param_values)
[docs] def convolve(self, other, **kw):
"""Convolve two PFs which is same as adding two random variables.
The scipy.signal package is used to perform the convolution with
different options set by mode.
parameters
----------
self : PF
other : PF
kw : keywork arguments, dict
May be:
* options : str
Way to perform the convolution. If mode is equal to:
* ``fft`` then ``scipy.signal.fftconvolve`` is used
* ``num`` then ``scipy.signal.convolve`` is used
* ``rvs`` then random variates are used to generate the PF
returns
-------
new : PF
new PF which is the convolution of self and other
Notes
-----
This method is working only for 1-dim pdf/pmf currently.
"""
try:
if not 'mode' in kw: kw['mode'] = 'fft'
new = PF(func=['rvadd', self, other], options=kw)
except:
raise
return new
[docs] def dF(self, x):
"""Compute the uncertainty on PF given the uncertainty on the shape
and norm parameters.
This method can be used to show an error band on your fitted PF.
To compute the uncertainty on the PF, the error propagation formula is
used::
dF(x;th) = (F(x;th+dth) - F(x;th-dth))/2
dF(x)^2 = dF(x;th)^T * corr(th,th') * dF(x;th')
so keep in mind it is only an approximation.
parameters
----------
x : float, ndarray
Random variable(s) value(s)
returns
-------
dF : float, ndarray
Uncertainty on the PF evaluated in x
"""
# Get list of free parameters
self.get_list_free_params()
npars = len(self._free_params)
if npars == 0: return np.zeros(len(x))
popt = np.ndarray(npars)
punc = np.ndarray(npars)
for ipar,par in enumerate(self._free_params):
popt[ipar] = par.value
punc[ipar] = par.unc
# Build the correlation matrix
corr = np.ndarray((npars, npars))
if self._pcov != None:
if self._pcov.shape[0] != npars or self._pcov.shape[1] != npars:
raise SyntaxError('covariance matrix is not defined properly')
for ipar in range(npars):
for jpar in range(npars):
corr[ipar][jpar] = self._pcov[ipar][jpar]
if punc[ipar] != 0.: corr[ipar][jpar] /= punc[ipar]
if punc[jpar] != 0.: corr[ipar][jpar] /= punc[jpar]
else:
corr = np.diag(np.ones(npars))
mcorr = np.asmatrix(corr)
# Compute dF(x;th)
if not isinstance(x, np.ndarray): x = np.asarray([x])
pf_plus = None
pf_minus = None
for ipar in range(npars):
# theta -> theta + deltaTheta
(self._free_params[ipar]).value = popt[ipar] + punc[ipar]
y = self(x)
if pf_plus == None:
pf_plus = y
else:
pf_plus = np.vstack((pf_plus, y))
# theta -> theta - deltaTheta
(self._free_params[ipar]).value = popt[ipar] - punc[ipar]
y = self(x)
if pf_minus == None:
pf_minus = y
else:
pf_minus = np.vstack((pf_minus, y))
dF_th = np.asmatrix(0.5 * (pf_plus - pf_minus))
# Compute dF
dF = (dF_th.T * mcorr) * dF_th
return np.sqrt(np.diag(dF))
[docs] def get_list_free_params(self):
"""Get the list of free parameters."""
self._free_params = []
# Get the list of normalization factors
if (not self.norm.const and self.norm.partype == Param.RAW and
(not self.norm in self._free_params)):
self._free_params.append(self.norm)
if self.pftype == PF.DERIVED and type(self.func) == list:
for ele in self.func:
if not isinstance(ele, PF): continue
if (ele.norm.partype == Param.RAW and not ele.norm.const and
(not ele.norm in self._free_params)):
self._free_params.append(ele.norm)
elif ele.norm.partype == Param.DERIVED:
raw_params = ele.norm.get_raw_params()
for raw_par in raw_params:
if raw_par.const: continue
self._free_params.append(raw_par)
# Get the list of shape parameters
params = copy.copy(self.params)
if self.loc != None: params.append(self.loc)
if self.scale != None: params.append(self.scale)
for par in params:
if par.partype == Param.RAW and not par.const:
self._free_params.append(par)
elif par.partype == Param.DERIVED:
raw_params = par.get_raw_params()
for raw_par in raw_params:
if raw_par.const: continue
if raw_par in self._free_params: continue
self._free_params.append(raw_par)
return self._free_params
[docs] def kurtosis(self, **kw):
"""Estimate the kurtosis of the PF. See PF.mean() for the syntax."""
kurtosis = self._estimate('k', scipy.stats.kurtosis, **kw)
return kurtosis
[docs] def leastsq_fit(self, xdata, ydata, ey=None, dx=None, cond=None, **kw):
"""Fit the PF to data using a least squares method.
The fitting part is performed using the scipy.optimize.leastsq
function. The Levenberg-Marquardt algorithm is used by the 'leastsq'
method to find the minimum values.
When calling this method, all PF parameters are minimized except
the one which are set as 'const'.
Parameters
----------
xdata : ndarray
Values for which ydata are measured and PF must be computed
ydata : ndarray
Observed values (like number of events)
ey : ndarray (optional)
Standard deviations of ydata. If not specified, it takes
sqrt(ydata) as standard deviation.
dx : ndarray (optional)
Array containing bin-width of xdata. It can be used to normalize
the PF to the integral while minimizing.
cond : boolean ndarray (optional)
Boolean array telling if a bin should be used in the fit or not
kw : keyword arguments
Keyword arguments passed to the leastsq method
Returns
-------
free_params : statspy.core.Param list
List of the free parameters used during the fit. Their 'value'
and 'unc' arguments are extracted from minimization.
pcov : 2d array
Estimated covariance matrix of the free parameters.
chi2min : float
Least square sum evaluated in popt.
pvalue : float
p-value = P(chi2>chi2min,ndf) with P a chi2 distribution and
ndf the number of degrees of freedom.
"""
# Define parameters which should be minimized and set initial values
self.get_list_free_params()
p0 = np.ones(len(self._free_params))
for ipar,par in enumerate(self._free_params):
p0[ipar] = par.unbound_repr()
# Compute weights
if ey == None: ey = np.sqrt(ydata)
ey[ey == 0] = 1.
if cond == None:
weight = 1./np.asarray(ey)
else:
weight = cond/np.asarray(ey)
# Call the leastsq method
if dx == None: dx = np.ones(xdata.shape)
args = (xdata, ydata, weight, dx)
res = scipy.optimize.leastsq(self._leastsq_function, p0,
args=args, full_output=1, **kw)
# Manage results
(popt, self._pcov, infodict, errmsg, ier) = res
self.logger.debug('Error message from leastsq: ' + str(ier) +
" " + errmsg)
if ier not in [1,2,3,4]:
msg = "Optimal parameters not found: " + errmsg
raise RuntimeError(msg)
chi2min = (self._leastsq_function(popt, *args)**2).sum()
if (len(ydata) > len(p0)) and self._pcov is not None:
ndf = len(ydata)-len(p0)
self._pcov = self._pcov * chi2min / ndf
pvalue = scipy.stats.chi2.sf(chi2min, ndf)
for ipar,par in enumerate(self._free_params):
unc = 0.
if (self._pcov)[ipar][ipar] >= 0.:
unc = math.sqrt((self._pcov)[ipar][ipar])
val2, unc2 = par.unbound_to_bound(popt[ipar], unc)
par._pcov = []
for jpar, par2 in enumerate(self._free_params):
if unc == 0.: continue
self._pcov[ipar][jpar] *= (unc2 / unc)
self._pcov[jpar][ipar] *= (unc2 / unc)
if jpar != ipar:
par._pcov.append([par2, self._pcov[jpar][ipar]])
else:
self._pcov = np.inf
pvalue = np.inf
return self._free_params, self._pcov, chi2min, pvalue
[docs] def loc(self, loc):
"""Derive a PF from another PF via a location parameter.
parameters
----------
self : PF
loc : Param, value
returns
-------
new : PF
new PF with x -> x + loc
"""
try:
new = PF(func=[self])
if isinstance(loc, Param):
new.loc = loc
else:
new.loc = Param(value=loc)
except:
raise
return new
[docs] def logpf(self, *args, **kwargs):
"""Compute the logarithm of the PF in x.
Parameters
----------
args : ndarray, tuple
Random Variable(s) value(s)
kwargs : keywork arguments, optional
Shape parameters values
Returns
-------
value : float, ndarray
Logarithm of the Probability Function value(s) in x
"""
# Check if self.func contains a logpdf() or a logpmf() method
if self.pftype == PF.RAW:
method_name = 'logpdf'
try:
if isinstance(self.func, scipy.stats.rv_discrete):
method_name = 'logpmf'
check_method_exists(obj=self.func,name=method_name)
except:
raise
# Get random variable value(s), mandatory
rv_values = self._get_rv_values(*args, **kwargs)
# Get shape parameters, optional
param_values = self._get_param_values(**kwargs)
# Compute logpf value in x
value = None
# - in case of a DERIVED PF, call an operator
if (self.pftype == PF.DERIVED and isinstance(self.func, list) and
len(self.func) == 1 and isinstance(self.func[0], PF)):
value = self.func[0].logpf(*args, **kwargs)
if self.scale != None and self.scale.value != 0.:
value = value - self.scale.value
if self.pftype == PF.DERIVED:
if not isinstance(self.func, list) or len(self.func) != 3:
raise SyntaxError('DERIVED function is not recognized.')
op = self.func[0]
if op == operator.add:
value = np.log(self(*args, **kwargs))
elif op == operator.mul:
vals = [0.,0.]
for idx in [1,2]:
the_args = []
for irv,rv in enumerate(self._rvs):
if rv[0] in self.func[idx].rv_names():
the_args.append(rv_values[irv])
vals[idx-1] = self.func[idx].logpf(*the_args, **kwargs)
value = operator.add(vals[0],vals[1])
if self.scale != None and self.scale.value != 0.:
value = value - self.scale.value
else:
raise NotImplementedError('Not yet possible...')
else:
# - in case of a RAW PF, call directly the scipy function
if method_name == 'logpmf':
value = self.func.logpmf(rv_values[0], *param_values)
else:
value = self.func.logpdf(rv_values[0], *param_values)
if self.scale != None and self.scale.value != 0.:
value = value - self.scale.value
return self.norm.value + value
[docs] def maxlikelihood_fit(self, data, **kw):
"""Fit the PF to data using the maximum likelihood estimator method.
The fitting part is performed using one of the scipy.optimize
minimization function. By default scipy.optimize.fmin is used, i.e.
the downhill simplex algorithm.
When calling this method, all PF parameters are minimized except
the one which are set as 'const' before calling the method.
Parameters
----------
data : ndarray, tuple
Data used in the computation of the (log-)likelihood function
kw : keyword arguments (optional)
Keyword arguments such as
optimizer : scipy.optimize function
Function performing the minimization. A list of minimizer is
available from scipy.optimize. If you provide your own, the
callable function must be defined with func and x0 as the
first two arguments. Data are passed via the args.
Returns
-------
free_params : statspy.core.Param list
List of the free parameters used during the fit. Their 'value'
arguments are extracted from the minimization process.
nnlfmin : float
Minimal value of the negative log-likelihood function
"""
# Define parameters which should be minimized and set initial values
self.get_list_free_params()
p0 = np.ones(len(self._free_params))
for ipar,par in enumerate(self._free_params):
p0[ipar] = par.unbound_repr()
# Define and call the optimizer
optimizer = kw.get('optimizer', scipy.optimize.fmin)
popt = optimizer(self._nllf, p0, args=(data, ), disp=0)
# Manage results
for ipar,par in enumerate(self._free_params):
val2, unc2 = par.unbound_to_bound(popt[ipar])
par.value = val2
nllfmin = self.nllf(data)
return self._free_params, nllfmin
[docs] def mean(self, **kw):
"""Estimate the mean of the PF.
Warning:
* In the case of a RAW PF from ``scipy.stats``, it calls the ``stats``
method and therefore returns the expected value.
* In the case of a DERIVED PF, it returns an estimate derived from
random variates and using the ``numpy.mean`` function.
Parameters
----------
kw : keywork arguments, optional
Shape parameters values
Returns
-------
mean : ndarray
"""
mean = self._estimate('m', np.mean, **kw)
return mean
[docs] def nllf(self, data, **kw):
"""Evaluate the negative log-likelihood function::
nllf = -sum_i(log(pf(x_i;params))
``sum`` runs over the x-variates defined in data array.
Parameters
----------
data : ndarray, tuple
x - variates used in the computation of the likelihood
kw : keyword arguments (optional)
Specify any Parameter name of the considered PF
Returns
-------
nllf : float
Negative log-likelihood function
"""
# Update values of non-const PF parameters if specified in **kw
for par in self._free_params:
if par.name in kw:
par.value = kw[par.name]
# Compute nllf
if isinstance(data, tuple):
nllf = -1 * np.sum(self.logpf(*data))
else:
nllf = -1 * np.sum(self.logpf(data))
return nllf
[docs] def pllr(self, data, **kw):
"""Evaluate the profile log-likelihood ratio ( * -2 )
The profile likelihood ratio is defined by::
l = L(x|theta_r,\hat{\hat{theta_s}}) / L(x|\hat{theta_r},\hat{theta_s})
The profile log-likehood ratio is then::
q = -2 * log(l)
Where
* L is the Likelihood function (computed via data)
* theta_r is the list of parameters of interest (Param.poi = True)
* theta_s is the list of nuisance parameters
* hat or double hat refers to the unconditional and conditional
maximum likelilood estimates of the parameters respectively.
pllr is used as a test statistics for problems with numerous
nuisance parameters. Asymptotically, the pllr PF is described by a
chi2 distribution (Wilks theorem).
Further information on the likelihood ratio can be found in Chapter
22 of "Kendall's Advanced Theory of Statistics, Volume 2A".
Parameters
----------
data : ndarray, tuple
x - variates used in the computation of the likelihood
kw : keyword arguments (optional)
Specify any Parameter of interest name of the considered PF,
or any option used by the method maxlikelihood_fit.
Returns
-------
pllf : float
Profile log-likelihood ratio times -2
"""
# Find the free parameters and the parameters of interest
# Update values of non-const PF parameters if specified in **kw
self.get_list_free_params()
lpois = []
for par in self._free_params:
if not par.poi: continue
lpois.append(par)
if par.name in kw:
par.value = kw[par.name]
# Compute the conditional nllf (pois are fixed)
for idx,poi in enumerate(lpois):
poi.const = True
cond_params, cond_nllf = self.maxlikelihood_fit(data, **kw)
for idx,poi in enumerate(lpois):
poi.const = False
# Compute the unconditional nllf (pois are treated as free parameters)
uncond_params, uncond_nllf = self.maxlikelihood_fit(data, **kw)
# Evaluate the pllr
pllr = 2. * (cond_nllf - uncond_nllf)
return pllr
[docs] def rvadd(self, other, **kw):
"""Operation equivalent to adding two random variables."""
try:
new = self.convolve(other, **kw)
except:
raise
return new
[docs] def rvdiv(self, other, **kw):
"""Operation equivalent to dividing two random variables."""
try:
new = PF(func=['rvdiv', self, other], options=kw)
except:
raise
return new
[docs] def rvmul(self, other, **kw):
"""Operation equivalent to multiplying two random variables."""
try:
new = PF(func=['rvmul', self, other], options=kw)
except:
raise
return new
[docs] def rvs(self, **kwargs):
"""Get random variates from a PF
Keyword arguments
-----------------
size : int
Number of random variates
mu, sigma,... : float
Any parameter name used while declaring the PF
Returns
-------
data : ndarray
Array of random variates
Examples
--------
>>> import statspy as sp
>>> pdf_x = sp.PF("pdf_x=norm(x;mu=20,sigma=5)")
>>> data = pdf_x.rvs(size=1000)
"""
try:
auto_loc = False
if isinstance(self.func, list):
if len(self.func) == 3:
auto_loc = True
if self.func[0] == operator.add:
data1 = self.func[1].rvs(**kwargs)
data2 = self.func[2].rvs(**kwargs)
data3 = scipy.stats.uniform.rvs(size=len(data1))
cond = (data3 < self.func[1].norm.value)
data = cond * data1 + (1 - cond) * data2
elif self.func[0] == 'rvadd':
data1 = self.func[1].rvs(**kwargs)
data2 = self.func[2].rvs(**kwargs)
data = data1 + data2
elif self.func[0] == 'rvsub':
data1 = self.func[1].rvs(**kwargs)
data2 = self.func[2].rvs(**kwargs)
data = data1 - data2
elif self.func[0] == 'rvmul':
data1 = self.func[1].rvs(**kwargs)
data2 = self.func[2].rvs(**kwargs)
data = data1 * data2
elif self.func[0] == 'rvdiv':
data1 = self.func[1].rvs(**kwargs)
data2 = self.func[2].rvs(**kwargs)
data = data1 / data2
else:
raise NotImplementedError('Not yet implemented...')
elif len(self.func) == 1 and isinstance(self.func[0], PF):
data = self.func[0].rvs(**kwargs)
else:
raise NotImplementedError('Not yet implemented...')
else:
method_name = "rvs"
check_method_exists(obj=self.func,name=method_name)
shape_params = []
for param in self.params:
if param.name in kwargs:
param.value = kwargs[param.name]
shape_params.append(param.value)
data = self.func.rvs(*shape_params, **kwargs)
if self.scale != None and self.scale.value != 0.:
data = data * self.scale.value
if self.loc != None and not auto_loc:
data = data + self.loc.value
except:
raise
return data
[docs] def rvsub(self, other, **kw):
"""Operation equivalent to subtracting two random variables."""
try:
if not 'mode' in kw: kw['mode'] = 'fft'
new = PF(func=['rvsub', self, other], options=kw)
except:
raise
return new
[docs] def rv_names(self):
"""Return a list of RV names."""
rv_names = [ rv[0] for rv in self._rvs ]
return rv_names
[docs] def scale(self, scale):
"""Derive a PF from another PF via a scale parameter.
parameters
----------
self : PF
scale : Param, value
returns
-------
new : PF
new PF with x -> x * scale
"""
try:
new = PF(func=[self])
if isinstance(scale, Param):
new.scale = scale
else:
new.scale = Param(value=scale)
except:
raise
return new
[docs] def sf(self, *args, **kwargs):
"""Compute the survival function (1 - cdf) in x.
Parameters
----------
args : ndarray, tuple
Random Variable(s) value(s)
kwargs : keywork arguments, optional
Shape parameters values
Returns
-------
value : float, ndarray
Survival function value(s) in x
"""
# Check if self.func contains an sf() method
if self.pftype == PF.RAW:
try:
check_method_exists(obj=self.func,name='sf')
except:
raise
# Get random variable value(s), mandatory
rv_values = self._get_rv_values(*args, **kwargs)
# Get shape parameters, optional
param_values = self._get_param_values(**kwargs)
# Compute sf value in x
# - in case of a DERIVED PF, call an operator
if (self.pftype == PF.DERIVED and isinstance(self.func, list) and
len(self.func) == 1 and isinstance(self.func[0], PF)):
value = self.norm.value * self.func[0].sf(*args, **kwargs)
return value
elif self.pftype == PF.DERIVED:
if not isinstance(self.func, list) or len(self.func) != 3:
raise SyntaxError('DERIVED function is not recognized.')
op = self.func[0]
if type(op) == str:
redocache = not self.isuptodate or self._spline_cdf == None
for name,value in zip(self.rv_names(), rv_values):
if (np.amin(value) < np.amin(self._cache[name]) or
np.amax(value) > np.amax(self._cache[name])):
redocache = True
if redocache: self._build_cache(*rv_values)
value = 1 - self.norm.value * self._spline_cdf(*rv_values)
else:
vals = [0.,0.]
for idx in [1,2]:
the_args = []
for irv,rv in enumerate(self._rvs):
if rv[0] in self.func[idx].rv_names():
the_args.append(rv_values[irv])
vals[idx-1] = self.func[idx].sf(*the_args, **kwargs)
value = self.norm.value * op(vals[0],vals[1])
return value
# - in case of a RAW PF, call directly the scipy function
return self.norm.value * self.func.sf(rv_values[0], *param_values)
[docs] def skew(self, **kw):
"""Estimate the skewness of the PF. See PF.mean() for the syntax."""
skew = self._estimate('s', scipy.stats.skew, **kw)
return skew
[docs] def std(self, **kw):
"""Estimate the standard deviation of the PF.
See PF.mean() for the syntax."""
var = self._estimate('v', np.var, **kw)
return np.sqrt(var)
[docs] def var(self, **kw):
"""Estimate the variance of the PF. See PF.mean() for the syntax."""
var = self._estimate('v', np.var, **kw)
return var
def _build_cache(self, rv_values=None):
if not isinstance(self.func, list):
raise SyntaxError('_build_cache, self.func is not a list.')
if len(self.func) < 3:
raise SyntaxError('self.func has a size lower than 3.')
if type(self.func[0]) != str:
raise SyntaxError('First element of self.func should be a str.')
if len(self.func[1]._rvs) != len(self.func[2]._rvs):
raise SyntaxError('self and other should have the same number of rvs.')
for rv1,rv2 in zip(self.func[1]._rvs,self.func[2]._rvs):
if rv1[3] == rv2[3]: continue
raise SyntaxError('%s and %s should be of the same type.' % (rv1[0], rv2[0]))
(getattr(self, ('_'+self.func[0])))(rv_values=rv_values)
self.isuptodate = True
return
def _build_cache_rvs(self):
"""Generate random variates and build an histogram"""
size0 = max(10**6, 100000 * 10**(len(self._rvs) - 1))
size = self.options.get('size', size0)
data = self.rvs(size=size)
if self._rvs[0][3] == RV.DISCRETE:
xmin = data.min() - 0.5
xmax = data.max() + 0.5
nbins = xmax - xmin
pf, bins = np.histogram(data, bins=nbins, range=(xmin, xmax))
else:
pf, bins = np.histogram(data, bins=100)
rv_name = self._rvs[0][0]
dtype = [(rv_name,float),('pf',float),('cdf',float)]
self._cache = np.recarray(pf.shape, dtype=dtype)
self._cache[rv_name] = 0.5*(bins[1:] + bins[:-1]) # Bin centers
self._cache.pf = pf # Bin contents
if self._rvs[0][3] == RV.CONTINUOUS:
dx = bins[1:] - bins[:-1] # Bin widths
integral = (self._cache.pf * dx).sum()
self._cache.cdf = np.cumsum(self._cache.pf * dx)
else:
integral = (self._cache.pf).sum()
self._cache.cdf = np.cumsum(self._cache.pf)
self.norm.value = 1. / integral
def _build_spline(self):
if self._cache == None:
raise SyntaxError('cache is empty, cannot build the spline')
if len(self._rvs) > 2: return
if len(self._rvs) == 1:
name = self._rvs[0][0]
method = self.options.get('spline',
scipy.interpolate.UnivariateSpline)
self._spline_pf = method(self._cache[name], self._cache.pf)
self._spline_cdf = method(self._cache[name], self._cache.cdf)
elif len(self._rvs) == 2:
method = self.options.get('spline',
scipy.interpolate.RectBivariateSpline)
name1 = self._rvs[0][0]
name2 = self._rvs[1][0]
self._spline_pf = method(self._cache[name1], self._cache[name2],
self._cache.pf)
self._spline_cdf = method(self._cache[name1], self._cache[name2],
self._cache.cdf)
return
def _check_args_syntax(self, args):
if not len(args): return False
if not isinstance(args[0],str):
raise SyntaxError("If an argument is passed to PF without a keyword, it must be a string.")
# Analyse the string
theStr = args[0]
if not '(' in theStr:
raise SyntaxError("No pf found in %s" % theStr)
if not ')' in theStr:
raise SyntaxError("Paranthesis is not closed in %s" % theStr)
func_name = theStr.split('(')[0].strip()
if '=' in func_name:
self.name = func_name.split('=')[0].strip().lstrip()
self.norm.name = 'norm_%s' % self.name
self.logger.debug("Found PF name %s", self.name)
func_name = func_name.split('=')[1]
if len(func_name.split()): func_name = func_name.split()[-1]
if not func_name in scipy.stats.__all__:
raise SyntaxError("%s is not found in scipy.stats" % func_name)
self.logger.debug("Found scipy.stats function named %s",func_name)
rvNames = theStr.split('(')[1].split(')')[0].strip().lstrip()
parNames = rvNames
if ';' in rvNames:
rvNames = rvNames.split(';')[0].strip().lstrip()
parNames = parNames.split(';')[1].strip().lstrip()
elif '|' in rvNames:
rvNames = rvNames.split('|')[0].strip().lstrip()
parNames = parNames.split('|')[1].strip().lstrip()
else:
parNames = None
for rv_name in rvNames.split(','):
if not rv_name in self.rv_names():
self._rvs.append([rv_name,-np.inf,np.inf,RV.UNKNOWN])
lpars = []
if parNames != None:
for par_name in parNames.split(','):
lpars.append(par_name.strip().lstrip())
self._declare(func_name,lpars)
return True
def _check_kwargs_syntax(self, kwargs, foundArgs):
if not len(kwargs): return False
if 'name' in kwargs:
if self.name != None:
raise SyntaxError("self.name is already set to %s" % self.name)
self.name = kwargs['name']
self.norm.name = 'norm_%s' % self.name
for par in self.params:
if par.name == None: continue
if not par.name in _dparams: continue
_dparams[par.name]['pfs'].append(self.name)
if not foundArgs and not 'func' in kwargs:
raise SyntaxError("You cannot declare a PF without specifying a function to caracterize it.")
if 'func' in kwargs:
if self.func != None:
raise SyntaxError("self.func already exists.")
self.func = kwargs['func']
if type(self.func) == list:
self.pftype = PF.DERIVED
for ele in self.func:
if not isinstance(ele, PF): continue
for par in ele.params:
if not par in self.params: self.params.append(par)
for rv in ele._rvs:
if (type(self.func[0]) == str and
self.func[0].startswith('rv')):
# For operations on RVs, the number of RVs is
# equal to the number of RVs for func[1] or
# func[2]
if len(self._rvs) < len(self.func[1]._rvs):
self._rvs.append(rv)
elif not rv[0] in self.rv_names():
self._rvs.append(rv)
if len(self.func) > 1 and type(self.func[0]) == str:
# Tricky operators like convolution requiring a cache
self._build_cache()
else:
self.pftype = PF.RAW
if 'param' in kwargs:
if len(self.params) != 0:
raise SyntaxError("self.params are already declared.")
if not isinstance(kwargs['params'], list):
raise SyntaxError("params should be a list.")
self.params = kwargs['params']
for par in self.params:
if par.name == None or self.name == None: continue
if not par.name in _dparams: continue
_dparams[par.name]['pfs'].append(self.name)
for param in self.params:
if param.name in kwargs and kwargs[param.name] != param.value:
param.value = kwargs[param.name]
self.logger.debug('%s value is updated to %f',
param.name,param.value)
return True
def _declare(self, func_name, lpars):
# Declare functional form of PF (no shape parameter specified yet)
self.func = getattr(scipy.stats,func_name)
self.pftype = PF.RAW
# Declare/Update parameters
for parStr in lpars:
parName = parStr.split('=')[0].strip().lstrip()
parVal = 0.
if '=' in parStr:
parVal = float(parStr.split('=')[1].strip().lstrip())
if not parName in _dparams:
_dparams[parName] = {'rvs':[],'pfs':[]}
_dparams[parName]['obj'] = Param(name=parName,value=parVal)
if self.name != None:
if not self.name in _dparams[parName]['pfs']:
_dparams[parName]['pfs'].append(self.name)
else:
if not self in _dparams[parName]['pfs']:
_dparams[parName]['pfs'].append(self)
self.params.append(_dparams[parName]['obj'])
return
def _estimate(self, stats, method, **kw):
if self.pftype == PF.RAW:
method_name = 'stats'
try:
check_method_exists(obj=self.func,name=method_name)
if not stats in ['m','v','s','k']:
raise SyntaxError("stats str should be among ['mvsk']")
except:
raise
# Get shape parameters, optional
param_values = self._get_param_values(**kw)
estimate = self.func.stats(*param_values, moments=stats)
else:
if not 'size' in kw: kw['size'] = 10000
data = self.rvs(**kw)
estimate = method(data)
return estimate
def _get_add_norm_params(self):
norm_params = []
if self.pftype == PF.DERIVED and self.func[0] == operator.add:
for ele in self.func:
if not isinstance(ele, PF): continue
raw_norm_params += ele._get_add_norm_params()
elif self.norm.partype == PF.RAW:
norm_params.append(self)
return norm_params
def _get_bounds(self):
bounds = []
for rv in self._rvs:
a = rv[1]
b = rv[2]
mean = self.mean()
if a == -np.inf or b == np.inf:
nsigma = 10
std = self.std()
if a == -np.inf:
c = mean - nsigma * std
else:
c = a
if b == np.inf:
d = mean + nsigma * std
else:
d = b
else:
c = a
d = b
if rv[3] == RV.DISCRETE: mean = round(mean)
bounds.append([a, b, c, d, mean])
return bounds
def _get_param_values(self, **kwargs):
param_values = [0.] * len(self.params)
for ipar,param in enumerate(self.params):
if param.name in kwargs and kwargs[param.name] != param.value:
param.value = kwargs[param.name]
self.logger.debug('%s value is updated to %f',
param.name,param.value)
param_values[ipar] = param.value
return param_values
def _get_rv_values(self, *args, **kwargs):
rv_values = []
if len(args):
for arg in args:
rv_values.append(arg)
else:
rv_values = []
for rv_name in self.rv_names():
if rv_name in kwargs: rv_values.append(kwargs[rv_name])
if self.pftype == PF.RAW and len(rv_values) != len(self._rvs):
raise SyntaxError('Provide %s input arguments for rvs' %
len(self._rvs))
if type(rv_values[0]) == float:
self.logger.debug('rv values=%s', rv_values)
if self.loc != None and len(rv_values) == 1:
rv_values[0] = rv_values[0] - self.loc.value
if (self.scale != None and len(rv_values) == 1 and
self.scale.value != 0.):
rv_values[0] = rv_values[0] / self.scale.value
return rv_values
def _leastsq_function(self, params, xdata, ydata, weight, dx):
"""Function used by scipy.optimize.leastsq"""
# Update values of non-const PF parameters
for ipar,par in enumerate(self._free_params):
par.unbound_to_bound(params[ipar])
# Return delta = (PF(x) - y)/sigma
delta = weight * (self(xdata) * dx - ydata)
return delta
def _nllf(self, params, data):
"""Evaluate the negative log-likelihood function using unbound
parameters"""
# Update values of non-const PF parameters
for ipar,par in enumerate(self._free_params):
par.unbound_to_bound(params[ipar])
# Compute nllf
if isinstance(data, tuple):
nllf = -1 * np.sum(self.logpf(*data))
else:
nllf = -1 * np.sum(self.logpf(data))
return nllf
def _rvadd(self, rv_values=None):
"""Convolve numerically two PFs and store the result in _cache."""
mode = self.options.get('mode', 'fft')
if mode == 'num' or mode == 'fft':
# Define the rv bounds
rv1_bounds = self.func[1]._get_bounds()
rv2_bounds = self.func[2]._get_bounds()
bounds = []
npts = self.options.get('size', 1001)
for rv1,rv2 in zip(rv1_bounds,rv2_bounds):
mean = None
if self.func[0] == 'rvadd':
a = rv1[0] + rv2[0]
b = rv1[1] + rv2[1]
c = rv1[2] + rv2[2]
d = rv1[3] + rv2[3]
if a == -np.inf and b == np.inf:
if self.loc == None: self.loc = Param(value=1.)
self.loc.value = rv1[4] + rv2[4]
c = c - self.loc.value
d = d - self.loc.value
else:
a = rv1[0] - rv2[1]
b = rv1[1] - rv2[0]
c = rv1[2] - rv2[3]
d = rv1[3] - rv2[2]
if self.loc == None: self.loc = Param(value=1.)
self.loc.value = rv1[4] - rv2[4]
wdt = (d - c) / 2
c = -wdt
d = wdt
bounds.append([a, b, c, d])
# TODO, algo is problematic when > 1 rv
for rv,bound in zip(self._rvs,bounds):
if rv[3] == RV.CONTINUOUS: continue
bound[2] = a = round(bound[2])
bound[3] = b = round(bound[3])
if b - a + 1 > npts:
step = round((b - a)/float(npts - 1))
npts = (b - a)/step + 1
else:
npts = b - a + 1
# Arrays of axis coordinates
dtype = []
for rv in self._rvs:
if rv[3] == RV.CONTINUOUS:
dtype.append((rv[0],float))
else:
dtype.append((rv[0],int))
dtype.append(('pf',float))
dtype.append(('cdf',float))
self._cache = np.recarray((npts,),dtype=dtype)
args1 = []
args2 = []
bin_area = 1.
step = None
for rv,bound in zip(self._rvs,bounds):
rv[1] = bound[0]
rv[2] = bound[1]
self._cache[rv[0]], step = np.linspace(bound[2], bound[3],
num=npts, retstep=True)
bin_area *= step
if self.loc != None:
args1.append(self._cache[rv[0]] + rv1_bounds[0][4])
args2.append(self._cache[rv[0]] + rv2_bounds[0][4])
else:
args1.append(self._cache[rv[0]])
args2.append(self._cache[rv[0]])
self.logger.debug('%s bounds set to %s' % (rv[0],bound))
# Evaluate pf values for both input pf
pf1 = self.func[1](*args1)
pf2 = self.func[2](*args2)
if self.func[0] == 'rvsub': pf2 = pf2[::-1]
# Perform the convolution
if mode == 'num':
pf = scipy.signal.convolve(pf1, pf2, mode='full')
else:
pf = scipy.signal.fftconvolve(pf1, pf2, mode='full')
if self.func[0] == 'rvadd' and self._rvs[0][1] != -np.inf:
start = 0
elif self.func[0] == 'rvsub' and 0 in self._cache[rv[0]]:
start = npts - 1 - np.where(self._cache[rv[0]] == 0)[0][0]
else:
start = (npts - 2) / 2
self._cache.pf = pf[start:start+npts]
# Cumulative distribution function
self._cache.cdf = np.cumsum(self._cache.pf * bin_area)
# Normalization factor
integral = np.sum(self._cache.pf * bin_area)
self.norm.value = 1. / integral
# Interpolation setup
if not 'spline' in self.options:
if len(self._rvs) == 1:
self.options['spline'] = scipy.interpolate.interp1d
elif len(self._rvs) == 2:
self.options['spline'] = scipy.interpolate.interp2d
else:
self.options['spline'] = scipy.interpolate.griddata
elif len(self._rvs) == 1:
# Generate random variated and build an histogram
self._build_cache_rvs()
else:
raise NotImplementedError('Sorry, rvadd operation not valid...')
# Interpolate
self._build_spline()
return
def _rvdiv(self, rv_values=None):
self._build_cache_rvs()
# Interpolate
self._build_spline()
return
def _rvmul(self, rv_values=None):
self._build_cache_rvs()
# Interpolate
self._build_spline()
return
def _rvsub(self, rv_values=None):
self._rvadd(rv_values=rv_values)
return
[docs]class RV(object):
"""Base class to define a Random Variable.
Attributes
----------
name : str
Random Variable name
pf : statspy.core.PF
Probability Function object associated to a Random Variable
rvtype : RV.CONTINUOUS or RV.DISCRETE
Random Variable type
logger : logging.Logger
message logging system
Examples
--------
>>> import statspy as sp
>>> X = sp.RV("norm(x|mu=10,sigma=2)")
"""
# Define the different random value types
(UNKNOWN,CONTINUOUS,DISCRETE) = (0,10,100)
def __init__(self, *args, **kwargs):
self.name = ""
self.pf = None
self.rvtype = RV.UNKNOWN
self.logger = logging.getLogger('statspy.core.RV')
try:
self.logger.debug('args = %s, kwargs = %s',args, kwargs)
foundArgs = self._check_args_syntax(args, kwargs)
self._check_kwargs_syntax(kwargs, foundArgs)
except:
raise
[docs] def __add__(self, other):
"""Add two random variables or a random variable by a parameter.
The associated PF of the sum of the two random variables is
a convolution of the two PFs.
Caveat: this method assumes *independent* random variables.
Parameters
----------
self : RV
other : RV, Param, value
Returns
-------
new : RV
new RV which is the sum of self and other
"""
try:
if isinstance(other, RV):
new = RV(pf=self.pf.rvadd(other.pf))
else:
new = RV(pf=self.pf.loc(other))
except:
raise
return new
[docs] def __call__(self, **kwargs):
"""Get random variates
Keyword arguments
-----------------
size : int
Number of random variates
mu, sigma,... : float
Any parameter name used while declaring the PF
Returns
-------
data : ndarray
Array of random variates
Examples
--------
>>> import statspy as sp
>>> X = sp.RV("norm(x;mu=20,sigma=5)")
>>> x = X(size=1000)
"""
try:
if not isinstance(self.pf, PF):
raise SyntaxError('No PF associated to this random variable.')
data = self.pf.rvs(**kwargs)
except:
raise
return data
[docs] def __div__(self, other):
"""Divide two random variables or a random variable by a parameter.
Caveat: this method assumes *independent* random variables.
Parameters
----------
self : RV
other : RV, Param, value
Returns
-------
new : RV
new RV which is the ratio of self and other
"""
try:
if isinstance(other, RV):
new = RV(pf=self.pf.rvdiv(other.pf))
else:
new = RV(pf=self.pf.scale((1./other)))
except:
raise
return new
[docs] def __mul__(self, other):
"""Multiply two random variables or a random variable by a parameter.
Caveat: this method assumes *independent* random variables.
Parameters
----------
self : RV
other : RV, Param, value
Returns
-------
new : RV
new RV which is the product of self and other
"""
try:
if isinstance(other, RV):
new = RV(pf=self.pf.rvmul(other.pf))
else:
new = RV(pf=self.pf.scale(other))
except:
raise
return new
[docs] def __radd__(self, other):
"""Add a random variable and a parameter.
Parameters
----------
self : RV
other : Param, value
Returns
-------
new : RV
new RV which is the sum of self and other
"""
try:
new = RV(pf=self.pf.loc(other))
except:
raise
return new
[docs] def __rmul__(self, other):
"""Multiply a parameter by a random variable.
Parameters
----------
self : RV
other : Param, value
Returns
-------
new : RV
new RV which is the product of self and other
"""
try:
new = RV(pf=self.pf.scale(other))
except:
raise
return new
[docs] def __sub__(self, other):
"""Subtract two random variables or a random variable by a parameter.
Caveat: this method assumes *independent* random variables.
Parameters
----------
self : RV
other : RV, Param, value
Returns
-------
new : RV
new RV which is the difference of self and other
"""
try:
if isinstance(other, RV):
new = RV(pf=self.pf.rvsub(other.pf))
else:
new = RV(pf=self.pf.loc((-1.)*other))
except:
raise
return new
def _check_args_syntax(self, args, kwargs):
if not len(args): return False
if not isinstance(args[0],str):
raise SyntaxError("If an argument is passed to PF without a keyword, it must be a string.")
self.pf = PF(*args, **kwargs)
if len(self.pf._rvs) != 1:
raise SyntaxError("One and only one RV should be found in %s." % args)
self.name = self.pf._rvs[0][0]
self.rvtype = self.pf._rvs[0][3]
if self.pf.name == None:
if self.rvtype == RV.CONTINUOUS:
self.pf.name = 'pdf_%s' % self.name
else:
self.pf.name = 'pmf_%s' % self.name
return True
def _check_kwargs_syntax(self, kwargs, foundArgs):
if not len(kwargs): return False
if not foundArgs and not 'pf' in kwargs:
raise SyntaxError("You cannot declare a Random Variable without specifying a pf.")
if 'name' in kwargs:
if self.name != None:
raise SyntaxError("self.name is already set to %s" % self.name)
self.name = kwargs['name']
if 'pf' in kwargs: self.pf = kwargs['pf']
if 'rvtype' in kwargs: self.rvtype = kwargs['rvtype']
return True
def check_method_exists(obj=None, name=""):
if obj == None:
raise StandardError('Object is not defined, check syntax.')
if (not hasattr(obj,name) or
not callable(getattr(obj,name))):
raise StandardError('No %s() method found.' % name)
return True
[docs]def get_obj(obj_name):
"""Returns a Param, PF or RV object.
Look in the different dictionaries if an object named obj_name exists
and returns it.
Parameters
----------
obj_name : str
Object name used to define a Param, PF or RV
Returns
-------
new : Param, PF, RV
Object if found in the dictionaries
Examples
--------
>>> import statspy as sp
>>> mypmf = sp.PF('mypmf=poisson(n;lbda=5)')
>>> lbda = sp.get_obj('lbda')
>>> lbda.label = '\\\\lambda'
"""
obj = None
for obj_dict in (_drvs, _dpfs, _dparams):
if not obj_name in obj_dict: continue
if obj != None:
logger.warning('%s is found multiple times !' % obj_name)
logger.debug('%s has been found in %s' % (obj_name, obj_dict))
obj = obj_dict[obj_name]['obj']
return obj
def update_status(obj_dict):
"""Check the list of objects depending on this parameter and update
their isuptodate status to False.
"""
for obj_key in ['rvs','pfs','params']:
if not obj_key in obj_dict: continue
for obj in obj_dict[obj_key]:
if hasattr(obj, "isuptodate"): obj.isuptodate = False
if hasattr(obj, "name") and obj.name != None:
if obj_key == "rvs" and obj.name in _drvs:
update_status(_drvs[obj.name])
if obj_key == "pfs" and obj.name in _dpfs:
update_status(_dpfs[obj.name])
if obj_key == "params" and obj.name in _dparams:
update_status(_dparams[obj.name])