Select Git revision
prefixCSS.py
-
Hartung, Michael authoredHartung, Michael authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
PeakOTron.py 76.85 KiB
## code starts###
import os
import numpy as np
import pandas as pd
from itertools import chain
from iminuit import Minuit
from iminuit.util import describe
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib import cm
from numba import njit
from difflib import ndiff
import re
from KDEpy import FFTKDE
import scipy.special as sc
from scipy.integrate import quad
from scipy.stats import poisson, skew, norm, binom
from scipy.interpolate import interp1d
from scipy.signal import find_peaks, peak_widths
from scipy.signal import fftconvolve as convolve
from astropy.stats import knuth_bin_width, freedman_bin_width, scott_bin_width, bootstrap
from sklearn.preprocessing import RobustScaler
from AdditionalPDFs import gpoisson, borel
class FakeFuncCode:
def __init__(self, f, prmt=None, dock=0, append=None):
#f can either be tuple or function object
self.co_varnames = describe(f)
self.co_argcount = len(self.co_varnames)
self.co_argcount -= dock
self.co_varnames = self.co_varnames[dock:]
if prmt is not None: #rename parameters from the front
for i, p in enumerate(prmt):
self.co_varnames[i] = p
if isinstance(append, str): append = [append]
if append is not None:
old_count = self.co_argcount
self.co_argcount += len(append)
self.co_varnames = tuple(
list(self.co_varnames[:old_count]) +
append +
list(self.co_varnames[old_count:]))
class BandWidthOptimiser:
#Hobbema, Hermans, and Van den Broeck (1971) and by Duin (1976)
#leave-one-out cross-validation approach
#https://cran.r-project.org/web/packages/kedd/vignettes/kedd.pdf
def _Dummy(self, _, bw):
return bw
def _KDE(self, bw):
try:
x_kde, y_kde = FFTKDE(kernel = self.kernel, bw=bw).fit(self.data).evaluate(self.n_kde_samples)
loss = -np.sum(np.log(y_kde))
except:
loss = self.eps_inv
return loss
def _PPF(self, data):
""" Compute ECDF """
x = np.sort(data)
n = x.size
y = np.arange(1, n+1) / n
_ppf = interp1d(y, x)
return _ppf
def __init__(self, data, kernel="gaussian", alpha = 0.95, n_kde_samples=2**13):
if(alpha<=0.5):
raise ValueError("alpha must be above 0.5")
self.data = data
self.PPF = self._PPF(self.data)
self.data = self.data[(self.data<self.PPF(alpha))]
self.N = len(data)
self.kernel=kernel
self.n_kde_samples=n_kde_samples
self.last_arg = None
self.func_code = FakeFuncCode(self._Dummy, dock=True)
self.eps = np.finfo(np.float64).eps * 10
self.eps_inv = 1/self.eps
def __call__(self, *arg):
self.last_arg = arg
loss = self._KDE(*arg)
return loss
class BinnedLH:
def __init__(self, f, x, y):
self.f = f
self.x = x
self.y = y
self.last_arg = None
self.func_code = FakeFuncCode(f, dock=True)
self.ndof = len(self.y) - (self.func_code.co_argcount - 1)
self.eps = np.finfo(np.float64).eps * 10
self.eps_inv = 1/self.eps
self.log_eps = np.log(self.eps)
def Logify(self, y):
return np.where(y>self.eps, np.log(y), self.log_eps)
def __call__(self, *arg):
self.last_arg = arg
y_hat = self.f(self.x, *arg)
# y_hat = np.nan_to_num(y_hat, nan=self.eps_inv)
logL = self.y*(self.Logify(y_hat) - self.Logify(self.y)) + (self.y - y_hat)
nlogL = -np.sum(logL)
return nlogL
class Chi2Regression:
def __init__(self, f, x, y, y_err, epsilon=1.35):
self.f = f
self.x = x
self.y = y
self.y_err = y_err
self.eps = np.finfo(np.float64).eps * 10
self.y_err[self.y_err<self.eps] = self.eps
self.last_arg = None
self.func_code = FakeFuncCode(f, dock=True)
self.ndof = len(self.y) - (self.func_code.co_argcount - 1)
def __call__(self, *arg):
self.last_arg = arg
loss = ((self.f(self.x, *arg) - self.y)/(self.y_err))**2
return np.sum(loss)
class HuberRegression:
def __init__(self, f, x, y, y_err, delta=1.35):
self.f = f
self.x = x
self.y = y
self.y_err = y_err
self.delta = delta
self.eps = np.finfo(np.float64).eps * 10
self.y_err[self.y_err<self.eps] = self.eps
self.last_arg = None
self.func_code = FakeFuncCode(f, dock=True)
self.ndof = len(self.y) - (self.func_code.co_argcount - 1)
def __call__(self, *arg):
self.last_arg = arg
a = abs((self.y - self.f(self.x, *arg))/self.y_err)
cond_flag = (a>self.delta)
loss = np.sum((~cond_flag) * (0.5 * a ** 2) - (cond_flag) * self.delta * (0.5 * self.delta - a), -1)
return np.sum(loss)
@njit
def _SelectRangeNumba(array, low_lim, hi_lim):
index = (array >= low_lim) & (array <= hi_lim)
return array[index]
class PeakOTron:
def __init__(self,
verbose=False,
n_call_minuit=1000,
n_iterations_minuit = 10
):
self.Init()
self.greek_letters = ["Alpha",
"alpha",
"Beta",
"beta",
"Gamma",
"gamma",
"Delta",
"delta",
"Epsilon",
"epsilon",
"Zeta",
"zeta",
"Eta",
"eta",
"Theta",
"theta",
"Iota",
"iota",
"Kappa",
"kappa",
"Lbda",
"lbda",
"Mu",
"mu",
"Nu",
"nu",
"Xi",
"xi",
"Omicron",
"omicron",
"Pi",
"pi",
"Rho",
"rho",
"Sigma",
"sigma",
"Tau",
"tau",
"Upsilon",
"upsilon",
"Phi",
"phi",
"Chi",
"chi",
"Psi",
"psi",
"Omega",
"omega"]
self._eps = np.finfo(np.float64).eps * 10
self._eps_kde = 1e-5
self._FWHM2Sigma = 1/(2*np.sqrt(2*np.log(2)))
self._plot_figsize= (10,10)
self._plot_fontsize= 20
self._cmap = cm.get_cmap('viridis')
self._max_n_peaks = 12
self._max_n_dcr_peaks = 6
self._len_DCR_pad = int(100)
self._verbose=verbose
self._n_call_minuit = n_call_minuit
self._n_iterations_minuit = n_iterations_minuit
self._is_histogram = False
self._default_fit_kwargs={
"tau":None,
"t_0":None,
"r_fast":None,
"tgate":None,
"n_bootstrap": 1000,
"n_bootstrap_kde":100,
"bw":None,
"peak_dist_factor":0.8,
"peak_width_factor":0.5,
"peak_ppf":0.99,
"peak_prominence":1e-3,
"kernel_kde":"gaussian",
"bandwidth_kde":"optimise",
"ppf_mainfit":1 - 1e-6,
"bin_method":"knuth",
"alpha":0.95
}
def LevDist(self, str1, str2):
counter = {"+": 0, "-": 0}
distance = 0
for edit_code, *_ in ndiff(str1, str2):
if edit_code == " ":
distance += max(counter.values())
counter = {"+": 0, "-": 0}
else:
counter[edit_code] += 1
distance += max(counter.values())
return distance
###HELPER FUNCTIONS
def Init(self):
self._peak_data = {}
self._GD_data = {}
self._DCR_data={}
self._fit_values= {}
self._fit_errors = {}
self._fit = None
self._failed = False
def LatexFormat(self, f, scirange=[0.01,1000]):
if(np.abs(f)>scirange[0] and np.abs(f)<scirange[1]):
float_str = r"${:3.3f}$".format(f)
else:
try:
float_str = "{:3.3E}".format(f)
base, exponent = float_str.split("E")
float_str = r"${0} \times 10^{{{1}}}$".format(base, int(exponent))
except:
float_str=str(f)
return float_str
def SelectRange(self, array, low_lim, hi_lim):
return _SelectRangeNumba(array, low_lim, hi_lim)
def Logify(self, y):
log_y = np.log(y)
min_log_y = np.min(log_y[y>0])
return np.where(y>0, log_y, min_log_y)
def SetMaxNPeaks(self, max_n_peaks):
max_n_peaks = int(max_n_peaks)
if(max_n_peaks<1):
raise Exception("Maximum number of peaks must be greater than 2.")
self._max_n_peaks = max_n_peaks
print("Set maximum number of peaks to {:d}.".format(self._max_n_peaks))
def SetMaxNDCRPeaks(self, max_n_dcr_peaks):
max_n = int( max_n_dcr_peaks)
if(max_n_dcr_peaks<2):
raise Exception("Maximum number of peaks must be greater than 2.")
self._max_n_dcr_peaks = max_n_dcr_peaks
print("Set maximum number of dark peaks to {:d}.".format(self._max_n_dcr_peaks))
def DummyLinear(self, x, m, c):
return m*x + c
def DummyLogGPois(self, k, mu, lbda, A):
return A + gpoisson.logpmf(k, mu, lbda)
def SigmaK(self, k, sigma_0, sigma_1):
return np.sqrt(sigma_0**2 + k*sigma_1**2)
def GPoisPPF(self, q, mu, lbda, k_max):
_sum = 0
_count = 0
while _count<=k_max:
_sum += gpoisson.pmf(_count, mu, lbda)
if(_sum<q):
_count+=1
else:
break
return _count
def EmpiricalPPF(self, data):
""" Compute ECDF """
x = np.sort(data)
n = x.size
y = np.arange(1, n+1) / n
_ppf = interp1d(y, x)
return _ppf
def FFTGain(self, x_kde, y_kde):
n = len(x_kde)
dt = abs(x_kde[1]-x_kde[0])
fhat = np.fft.fft(y_kde, n) #computes the fft
psd = fhat * np.conj(fhat)/n
freq = (1/(dt*n)) * np.arange(n) #frequency array
idxs_half = np.arange(1, np.floor(n/2), dtype=np.int32)
y = np.abs(psd[idxs_half])
x = freq[idxs_half]
peaks, _= find_peaks(y)
idx_sort = np.argsort(y[peaks])
y_peaks = y[peaks][idx_sort]
x_peaks = x[peaks][idx_sort]
x_peak = x_peaks[-1]
# plt.figure(figsize=(10.0, 10.0))
# plt.title("$G$={:s} Q.U.".format(self.LatexFormat((1/x_peak)*self.scaler.scale_[0])), fontsize=20)
# plt.plot(x, y, lw=3)
# plt.ylabel("Energy Spectral Density $|S(f_{PH})|^{2}$", fontsize=25)
# plt.xlabel("$f_{PH}{$ [Inv. Q.U.]", fontsize=20)
# plt.axvline(x=x_peak, color="purple", lw=3, linestyle="--", label="Maximum Peak")
# plt.yscale("log")
# plt.xscale("log")
# plt.xticks(fontsize=20)
# plt.yticks(fontsize=20)
# plt.grid(which="both")
# plt.legend(fontsize=15)
# plt.show()
gain = 1/x_peak
return gain
def EstLbda(self, data_sub_x_0):
_skew = skew(data_sub_x_0)
_mean = np.mean(data_sub_x_0)
_std = np.std(data_sub_x_0)
_lbda = 0.5*(((_mean*_skew)/(_std))- 1)
return _lbda
def EstGain(self, data_sub_x_0):
_skew = skew(data_sub_x_0)
_mean = np.mean(data_sub_x_0)
_std = np.std(data_sub_x_0)
_lbda = 0.5*(((_mean*_skew)/(_std))- 1)
_gain = (_std**2/_mean)*((1 - _lbda)**2)
return _gain
def EstMu(self, data_sub_x_0):
_skew = skew(data_sub_x_0)
_mean = np.mean(data_sub_x_0)
_std = np.std(data_sub_x_0)
_lbda = 0.5*(((_mean*_skew)/(_std))- 1)
_mu = (1/(1-_lbda))*(_mean**2/_std**2)
return _mu
def Bootstrap(self, data, statistic, alpha=0.95, n_samples=10000):
if not (0 < alpha < 1):
raise ValueError("confidence level must be in (0, 1)")
n = n_samples
if n <= 0:
raise ValueError("data must contain at least one measurement.")
boot_stat = bootstrap(data, n_samples, bootfunc=statistic)
stat_data = statistic(data)
mean_stat = np.mean(boot_stat)
est_stat = 2*stat_data - mean_stat
std_err = np.std(boot_stat)
z_score = np.sqrt(2.0)*sc.erfinv(alpha)
conf_interval = est_stat + z_score*np.array((-std_err, std_err))
return est_stat, std_err, conf_interval
def BootstrapKDE(self, data, kernel = "gaussian", bw="optimise",
n_kde_samples=2**13,
alpha=0.95,
n_samples=1000,
limits=None
):
if not (0 < alpha < 1):
raise ValueError("Bootstrap confidence level, alpha, must be in (0, 1).")
n = n_samples
if n <= 0:
raise ValueError("Bootstrap data must contain at least one measurement.")
kernels = list(FFTKDE._available_kernels.keys())
if kernel not in kernels:
raise ValueError("Selected KDE kernel not available. Please choose from these options: {:s}.".format(", ".join(kernels)))
if(not isinstance(bw, float)):
bws = list(FFTKDE._bw_methods.keys()) + ["optimise"]
if bw not in bws:
raise ValueError("Selected KDE bandwidth estimation method not available. Please choose from these options: {:s}.".format(", ".join(bws)))
if(bw=="optimise"):
try:
BWO= BandWidthOptimiser(data, kernel=kernel)
bw_scott = scott_bin_width(data)
minuit_kde = Minuit(BWO, bw=bw_scott/2)
minuit_kde.limits["bw"]=(self._eps_kde, 3*bw_scott)
minuit_kde.strategy=2
minuit_kde.migrad(ncall= self._n_call_minuit,
iterate =self._n_iterations_minuit)
bw = minuit_kde.values["bw"]
if(self._verbose):
print("KDE Bandwidth optimised to: {0}".format(bw))
except:
if(self._verbose):
print("KDE Bandwidth optimisation failed. Defaulting to Improved Sheather Jones.")
bw = "ISJ"
kde = FFTKDE(kernel = kernel, bw=bw).fit(data)
x_kde, y_kde_orig = kde.evaluate(n_kde_samples)
boot_data = bootstrap(data, n_samples)
y_kde_bs = np.vstack([FFTKDE(kernel = kernel, bw=bw).fit(_data).evaluate(x_kde)
for _data in boot_data])
y_kde_mean = np.mean(y_kde_bs, axis=0)
y_kde = 2*y_kde_orig - y_kde_mean
y_kde_err = np.std(y_kde_bs, axis=0)
z_score = np.sqrt(2.0)*sc.erfinv(alpha)
y_kde_conf_interval = y_kde + z_score*np.array((-y_kde_err, y_kde_err))
if(limits is not None):
cond_inrange =(x_kde>np.min(limits)) & (x_kde<np.max(limits))
x_kde = x_kde[cond_inrange]
y_kde = y_kde[cond_inrange]
y_kde_err = y_kde_err[cond_inrange]
return x_kde, y_kde, y_kde_err, y_kde_conf_interval
##MODEL DEFINITIONS
def DRM_basic(self, x, mu, x_0, G, sigma_0, sigma_1, alpha, r_beta, lbda, k_low, k_hi):
k = np.arange(k_low, k_hi)
k = sorted(list(k[k!=0]))
k_is = [k[0:int(_k)] for _k in k]
f0 = gpoisson.pmf(0, mu, lbda)*norm.pdf(x, x_0, sigma_0)
f1s = np.asarray([
binom.pmf(0, _k, alpha)*gpoisson.pmf(_k, mu, lbda)*norm.pdf(x, x_0 + _k*G, self.SigmaK(_k, sigma_0, sigma_1))
for _k in k
])
f2s = np.vstack([
np.asarray([
binom.pmf(_i, _k, alpha)*gpoisson.pmf(_k, mu, lbda)*self.XTM(x, _k, _i, x_0, G, sigma_0, sigma_1, r_beta)
for _i in _is
])
for _k, _is in zip(k, k_is)
])
f = np.sum(np.vstack([f0, f1s, f2s]), axis=0)
return f
def XTM(self, x, k, i, x_0, G, sigma_0, sigma_1, r_beta):
x_k = x - (x_0 + k*G)
_sigma_k = self.SigmaK(k, sigma_0, sigma_1)
x_ct_pad = np.zeros_like(x_k)
if(i==1):
cond = (x_k > -5*_sigma_k)
error_f = 0.5*(1.0 + sc.erf(x_k/(_sigma_k*np.sqrt(2.0))))
log_f = -x_k/(r_beta*G) - np.log(r_beta*G)
x_ct = np.where(cond, np.exp(log_f)*error_f, x_ct_pad)
elif(i>1):
cond = (x_k > 0)
log_f = sc.xlogy(i-1, x_k) - sc.gammaln(i) - sc.xlogy(i, r_beta*G) - x_k/(r_beta*G)
x_ct = np.where(cond, np.exp(log_f), x_ct_pad)
else:
x_ct = x_k
return x_ct
def PH_range(self, t, t_0, r_fast, tau, tgate):
if((t>t_0) and (t<0)):
PH_min = (1-r_fast)*np.exp(t_0/tau)*(1 - np.exp(-tgate/tau))
PH_max = (1-r_fast)*(1 - np.exp(-tgate/tau))
elif((t>0) and (t<tgate)):
PH_min = r_fast
PH_max = (1 - (1-r_fast)*np.exp(-tgate/tau))
else:
PH_min = 0
PH_max = 0
return PH_min, PH_max
def dpdPH(self, x, x_0, G, tau, tgate, t_0, r_fast):
PH_bfg_low, PH_bfg_hi = self.PH_range(-abs(t_0)/2, -abs(t_0), r_fast, tau, tgate)
PH_dg_low, PH_dg_hi = self.PH_range(tgate/2, -abs(t_0), r_fast, tau, tgate)
x_norm = (x-x_0)/(G)
PHs = np.zeros_like(x_norm)
cond_bfg = (x_norm>PH_bfg_low) & (x_norm<=PH_bfg_hi)
cond_dg = (x_norm>PH_dg_low) & (x_norm<=PH_dg_hi)
PHs[cond_bfg] += 1/x_norm[cond_bfg]
PHs[cond_dg] += 1/(1 - x_norm[cond_dg])
return PHs
def ApplyDCR(self, x, y_light, x_0, G, DCR, tgate, t_0, tau, r_fast, lbda, dx, k_dcr_low, k_dcr_hi):
mu_dcr = DCR*(abs(t_0) + tgate)
x_lin = np.arange(0, len(x))
x_pad_lin = np.arange(-self._len_DCR_pad, len(x)+self._len_DCR_pad)
f_lin = interp1d(x, x_lin, fill_value='extrapolate')
G_lin = f_lin(x_0+G) - f_lin(x_0)
x_0_lin = f_lin(x_0)
idx_orig = np.where(np.in1d(x_pad_lin,
np.intersect1d(
x_pad_lin,
x_lin
)
), True, False)
idx_x_0_lin = np.argmin(abs(x_pad_lin - x_0_lin))
n_dcr = np.arange(k_dcr_low, max(2, k_dcr_hi) )
fs = []
pfs = []
hs = []
phs = []
for _n_dcr in n_dcr:
if(_n_dcr==0):
f = np.zeros(len(x_pad_lin))
f[idx_x_0_lin] = 1
pf = poisson.pmf(0, mu_dcr)
h = np.zeros(len(x_pad_lin))
ph = 0
else:
if(len(fs)<=1):
f = self.dpdPH(x_pad_lin, x_0_lin, G_lin, tau, tgate, t_0, r_fast)
else:
f = convolve(fs[1], fs[-1])[idx_x_0_lin:len(x_pad_lin)+idx_x_0_lin]
f = f/ np.trapz(f, dx = 1)
pf = poisson.pmf(_n_dcr, mu_dcr)*(borel.pmf(0, lbda)**_n_dcr)
h = self.dpdPH(x_pad_lin, x_0_lin, G_lin*(_n_dcr+1), tau, tgate, t_0, r_fast)
h = h/ np.trapz(h, dx = 1)
ph = poisson.pmf(1, mu_dcr)*borel.pmf(_n_dcr, lbda)/(_n_dcr+1)
fs.append(f)
pfs.append(pf)
hs.append(h)
phs.append(ph)
fs = np.asarray(fs)
hs = np.asarray(hs)
pfs = np.expand_dims(np.asarray(pfs), -1)
phs = np.expand_dims(np.asarray(phs), -1)
y_dark = np.sum(fs*pfs, axis=0) + np.sum(hs*phs, axis=0)
#y_dark = y_dark/np.trapz(h, dx = 1)
y_dark = y_dark/np.trapz(y_dark, dx = 1)
y_model = convolve(y_dark,
np.pad(y_light,
(self._len_DCR_pad, self._len_DCR_pad),
"constant",
constant_values=(0.0,0.0)),
)[idx_x_0_lin:len(x_pad_lin)+idx_x_0_lin]
y_model = y_model/np.trapz(y_model, dx = 1)/dx
y_model = y_model[idx_orig]
return y_model
def DRM_nodcr(
self,
x,
mu,
x_0,
G,
sigma_0,
sigma_1,
alpha,
r_beta,
lbda,
k_low,
k_hi):
y_model = self.DRM_basic(x,
mu,
x_0,
G,
sigma_0,
sigma_1,
alpha,
r_beta,
lbda,
k_low,
k_hi,
dx
)
return y_model
def DRM(
self,
x,
mu,
x_0,
G,
sigma_0,
sigma_1,
alpha,
r_beta,
lbda,
k_low,
k_hi,
k_dcr_low,
k_dcr_hi,
DCR,
tau,
tgate,
t_0,
r_fast,
dx
):
y_light = self.DRM_basic(x,
mu,
x_0,
G,
sigma_0,
sigma_1,
alpha,
r_beta,
lbda,
k_low,
k_hi
)
y_model = self.ApplyDCR(x,
y_light,
x_0,
G,
DCR,
tgate,
t_0,
tau,
r_fast,
lbda,
dx,
k_dcr_low,
k_dcr_hi)
return y_model
###PLOTTING
def PlotOriginalHistogram(self, ax):
ax.plot(self._GD_data["x_s"],
self._GD_data["y_s"],
lw=5,
label="Data")
ax.set_title("Normalised \n Input \n Distribution",fontsize=self._plot_fontsize)
ax.set_xlabel("IQR Normalised Charge Units [Q.U.]", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.set_yscale("log")
ax.grid(which="both")
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
def PlotKDE(self, ax):
ax.plot(self._GD_data["x_s"],
self._GD_data["y_s"],
label="Data",
lw=5
)
ax.plot(self._GD_data["x_kde_s"],
self._GD_data["y_kde_s"],
lw=2.5,
label="KDE")
ax.set_title("Kernel Density Estimate", fontsize=self._plot_fontsize)
ax.set_xlabel("IQR Normalised Charge Units [Q.U.]", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.set_yscale("log")
ax.grid(which="both")
ax.set_ylim([1e-5, None])
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
def PlotPeaks(self, ax):
ax.plot(self._GD_data["x_s"],
self._GD_data["y_s"],
label="Input Pulse \n Height \n Spectrum",
lw=5
)
color_vals = [self._cmap(_v) for _v in np.linspace(0,0.75, len(self._peak_data["x_peak_s"]))]
for _i_x, (_x, _x_w, _c_v) in enumerate(zip(
self._peak_data["x_peak_s"],
self._peak_data["x_width_s"],
color_vals
)):
_x_peak =_x
_x_width_low = _x - _x_w/2
_x_width_hi = _x + _x_w/2
_y_peak = self._GD_data["y_s"][np.argmin(abs(self._GD_data["x_s"]-_x_peak))]
_y_width_low = self._GD_data["y_s"][np.argmin(abs(self._GD_data["x_s"]-_x_width_low))]
_y_width_hi = self._GD_data["y_s"][np.argmin(abs(self._GD_data["x_s"]-_x_width_hi))]
if(_i_x == 0):
annotation_text = "Pedestal"
color = "red"
else:
annotation_text = "Peak {:d}".format(_i_x)
color = _c_v
ax.vlines(x=[_x],
ymin=0,
ymax=_y_peak,
color=color,
linestyle="-",
lw=2.5,
label = '\n{:s}: \n $x_{{pos}}$ = {:3.3f} Q.U \n $\Delta x_{{pos}}$ {:3.3f} Q.U \n'.format(
annotation_text,
_x_peak,
_x_width_hi - _x_width_low)
)
ax.vlines(x=[_x_width_low],
ymin=0,
ymax=_y_width_low,
color=color,
lw=2.5,
linestyle="--")
ax.vlines(x=[_x_width_hi],
ymin=0,
ymax=_y_width_hi,
color=color,
lw=2.5,
linestyle="--")
box = ax.get_position()
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
# Put a legend to the right of the current axis
ax.legend(loc='center left',
bbox_to_anchor=(1, 0.5),
fontsize=max(10, self._plot_fontsize//2),
ncol=len(self._peak_data["x_peak_s"])//6 + 1
)
ax.set_title("Peak Finding \n # Peaks = {:d}".format(len(self._peak_data["x_peak_s"])) ,
fontsize=self._plot_fontsize)
ax.set_xlabel("IQR Normalised Charge Units [Q.U.]", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.set_yscale("log")
ax.grid(which="both")
def PlotGenPois(self, ax):
ax.scatter(self._peak_data["n_peak_s"],
self.Logify(self._peak_data["y_peak_s"]),
label="Extracted Peak Heights"
)
y_lgp = self.DummyLogGPois(self._peak_data["n_peak_s"],
self._GD_data["fit_GP_mu"],
self._GD_data["fit_GP_lbda"],
self._GD_data["fit_GP_A"]
)
ax.plot(self._peak_data["n_peak_s"],
y_lgp,
linestyle="--",
lw=2.5,
color="red",
label="Fit")
ax.set_title(
"Generalised Poisson Fit \n $\\mu$ = {0} $\pm$ {1} \n $\\lambda$ = {2} $\pm$ {3} \n $A$ = {4} $\pm$ {5}".format(
self.LatexFormat(self._GD_data["fit_GP_mu"]),
self.LatexFormat(self._GD_data["fit_GP_mu_err"]),
self.LatexFormat(self._GD_data["fit_GP_lbda"]),
self.LatexFormat(self._GD_data["fit_GP_lbda_err"]),
self.LatexFormat(self._GD_data["fit_GP_A"]),
self.LatexFormat(self._GD_data["fit_GP_A_err"])
), fontsize=self._plot_fontsize)
ax.set_xlabel("Peak", fontsize=self._plot_fontsize)
ax.set_ylabel("$\\log{\\frac{dP}{dPH}}$", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
ax.grid(which="both")
def PlotSigmaEst(self, ax):
c = self._peak_data["cond_valid_peak"]
ax.set_title("Estimated Peak Variance \n $\\sigma^{{2}}_{{0}}$={0} $\pm$ {1} \n $\\sigma^{{2}}_{{1}}$={2} $\pm$ {3}".format(
self.LatexFormat(self._GD_data["fit_var0"]),
self.LatexFormat(self._GD_data["fit_var0_err"]),
self.LatexFormat(self._GD_data["fit_var1"]),
self.LatexFormat(self._GD_data["fit_var1_err"]),
), fontsize=self._plot_fontsize)
ax.errorbar(self._peak_data["n_peak_s"][c],
self._peak_data["peak_var_s"][c],
yerr=self._peak_data["peak_var_err_s"][c],
label="Extracted Variances",
fmt="o")
ax.plot(self._peak_data["n_peak_s"][c],
self.DummyLinear(self._peak_data["n_peak_s"][c],
self._GD_data["fit_var1"],
self._GD_data["fit_var0"]
),
linestyle="--",
lw=2.5,
color="red",
label="Fit")
ax.set_xlabel("Peak", fontsize=self._plot_fontsize)
ax.set_ylabel("$\sigma^{2}_{peak}$", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
ax.grid(which="both")
# def PlotGainEst(self, ax):
# c = self._peak_data["cond_valid_peak"]
# ax.set_title("Estimated Gain \n $G$={0} $\pm$ {1} \n $PH_{{0}}$={2} $\pm$ {3}".format(
# self.LatexFormat(self._GD_data["fit_G"]),
# self.LatexFormat(self._GD_data["fit_G_err"]),
# self.LatexFormat(self._GD_data["fit_x_0"]),
# self.LatexFormat(self._GD_data["fit_x_0_err"]),
# ), fontsize=self._plot_fontsize)
# ax.errorbar(self._peak_data["n_peak_s"][c],
# self._peak_data["peak_mean_s"][c],
# yerr=self._peak_data["peak_mean_err_s"][c],
# label="Extracted Peak Means",
# fmt="o")
# ax.plot(self._peak_data["n_peak_s"][c],
# self.DummyLinear(self._peak_data["n_peak_s"][c],
# self._GD_data["fit_G"],
# self._GD_data["fit_x_0"]
# ),
# linestyle="--",
# lw=2.5,
# color="red",
# label="Fit")
# ax.set_xlabel("Peak", fontsize=self._plot_fontsize)
# ax.set_ylabel("$<Peak>$", fontsize=self._plot_fontsize)
# ax.tick_params(axis="x", labelsize=self._plot_fontsize)
# ax.tick_params(axis="y", labelsize=self._plot_fontsize)
# ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
# ax.grid(which="both")
def PlotDCREst(self, ax):
x_dc_kde = self._DCR_data["x_dc_kde"]
y_dc_kde = self._DCR_data["y_dc_kde"]
y_dc_kde_err = self._DCR_data["y_dc_kde_err"]
cond_p_0toLM = self._DCR_data["cond_p_0toLM"]
cond_p_LMto2LM = self._DCR_data["cond_p_LMtoL2M"]
cond_interpeak = self._DCR_data["cond_interpeak"]
int_p_0toLM = self._DCR_data["int_p_0toLM"]
dint_p_0toLM = self._DCR_data["dint_p_0toLM"]
int_p_LMto2LM = self._DCR_data["int_p_LMto2LM"]
dint_p_LMto2LM = self._DCR_data["dint_p_LMto2LM"]
mean_p_interpeak = self._DCR_data["mean_p_interpeak"]
dmean_p_interpeak = self._DCR_data["dmean_p_interpeak"]
ax.fill_between(
x_dc_kde[cond_p_0toLM],
y_dc_kde[cond_p_0toLM],
color='none',
hatch="///",
edgecolor="b",
label = r"$I_{{0 \rightarrow x_{{min}}}}$ = {:s} $\pm$ {:s}".format(self.LatexFormat(int_p_0toLM),
self.LatexFormat(dint_p_0toLM) )
)
ax.fill_between(
x_dc_kde[cond_p_LMto2LM],
y_dc_kde[cond_p_LMto2LM],
color='none',
hatch="///",
edgecolor="r",
label = r"$I_{{x_{{min}} \rightarrow 2x_{{min}}}}$ = {:s} $\pm$ {:s}".format(self.LatexFormat(int_p_LMto2LM),
self.LatexFormat(dint_p_LMto2LM) )
)
ax.fill_between(
x_dc_kde[cond_interpeak],
y_dc_kde[cond_interpeak],
color='none',
hatch="xxx",
edgecolor="g",
label = r"$<I_{{x_{{min}} - x_{{lim}} \rightarrow x_{{min}} + x_{{lim}} }}>$ = {:s} $\pm$ {:s}".format(
self.LatexFormat(mean_p_interpeak),
self.LatexFormat(dmean_p_interpeak) )
)
ax.plot(x_dc_kde,
y_dc_kde,
linestyle="-",
color="purple",
label="KDE, 95% Confidence"
)
ax.fill_between(x_dc_kde,
y1=y_dc_kde - 1.96*y_dc_kde_err,
y2=y_dc_kde + 1.96*y_dc_kde_err,
alpha=0.2,
color="purple",
label="KDE, 95% Confidence"
)
ax.set_title("Dark Count Rate \n DCR = {0} $\pm$ {1}".format(
self.LatexFormat(self._DCR_data["fit_DCR"]),
self.LatexFormat(self._DCR_data["fit_DCR_err"])),
fontsize=self._plot_fontsize)
ax.set_xlabel("$PH_{norm, est}$", fontsize=self._plot_fontsize)
ax.grid(which="both")
ax.set_ylabel("$\\frac{dp_{DCR}}{PH_{norm, est}}$", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
ax.set_yscale("log")
def PlotFit(self,
figsize=(10.5,10.5),
labelsize=None,
ticksize=None,
titlesize=None,
legendsize=None,
title=None,
xlabel = None,
scaled=False,
save_directory=None,
y_limits = [None, None],
residual_limits = [-5, 5],
linewidth_main = 5,
x_limits = [None, None],
display=True
):
if(labelsize is None):
labelsize=self._plot_fontsize
if(ticksize is None):
ticksize=self._plot_fontsize
if(titlesize is None):
titlesize = self._plot_fontsize
if(legendsize is None):
legendsize = max(10, self._plot_fontsize//1.5)
if(scaled):
x = self._GD_data["x_s"]
y = self._GD_data["y_s"]
y_err = self._GD_data["y_err_s"]
y_hat = self.GetModelIQR(self._GD_data["x_s"])
if(xlabel is None):
xlabel = "IQR Normalised Charge Units [Q.U.]"
else:
x = self._GD_data["x"]
y = self._GD_data["y"]
y_err = self._GD_data["y_err"]
y_hat = self.GetModel(self._GD_data["x"])
if(xlabel is None):
xlabel = "Input Charge Units [Q.U.]"
fig = plt.figure(figsize=figsize)
gs = gridspec.GridSpec(2, 1, height_ratios=[2, 1])
ax0 = plt.subplot(gs[0])
ax0.plot(x, y, label="Data", lw=5, color="C0")
ax0.plot(x, y_hat, linestyle="--", label="Model", lw=5, color="C1")
ax0.set_yscale("log")
ax0.set_xlabel("$\\frac{dp}{dPH}$", fontsize=labelsize)
ax0.tick_params(axis="x", labelsize=ticksize)
ax0.tick_params(axis="y", labelsize=ticksize)
ax0.set_ylim(y_limits)
ax0.set_xlim(x_limits)
if(title is not None):
ax0.set_title(title, fontsize=titlesize)
ax0.grid(which="both")
ax0.legend(fontsize=legendsize)
ax1 = plt.subplot(gs[1], sharex = ax0)
ax1.scatter(x, (y - y_hat)/(y_err), color="C1")
ax1.tick_params(axis="x", labelsize=ticksize)
ax1.tick_params(axis="y", labelsize=ticksize)
ax1.set_xlabel(xlabel, fontsize=labelsize)
ax1.set_ylim(residual_limits)
ax1.axhline(y=-1.0, color="purple", lw=2.5, linestyle="--")
ax1.axhline(y=1.0, color="purple", lw=2.5, linestyle="--")
ax1.tick_params(axis="x", labelsize=self._plot_fontsize)
ax1.tick_params(axis="y", labelsize=self._plot_fontsize)
ax1.grid(which="both")
fig.tight_layout()
fig.subplots_adjust(hspace=.0)
if(save_directory is not None):
print("Saving figure to {0}...".format(save_directory))
fig.savefig(save_directory)
if(display):
plt.pause(0.01)
fig.show()
else:
plt.close(fig)
def PlotSummary(self, display=True, save_directory=None):
fig = plt.figure(figsize=(20,40))
gs = gridspec.GridSpec(4, 2)
gs.update(wspace=0.5, hspace=0.5)
ax0 = fig.add_subplot(gs[0,0])
self.PlotOriginalHistogram(ax0)
ax1 = fig.add_subplot(gs[0,1])
self.PlotKDE(ax1)
ax2 = fig.add_subplot(gs[1,:])
self.PlotPeaks(ax2)
ax3 = fig.add_subplot(gs[2,0])
self.PlotGenPois(ax3)
ax4 = fig.add_subplot(gs[2,1])
self.PlotSigmaEst(ax4)
ax6 = fig.add_subplot(gs[3,1])
self.PlotDCREst(ax6)
if(save_directory is not None):
print("Saving figure to {0}...".format(save_directory))
fig.savefig(save_directory)
if(display):
plt.pause(0.01)
fig.show()
else:
plt.close(fig)
def GetModel(self, x):
return self.DRM(x, **self._fit_values)
def GetModelIQR(self, x):
return self.DRM(x, **self._fit_values_s)
def GetOriginalDistribution(self):
return _self._GD_data["x"], self._GD_data["y"]
def GetFitResults(self):
return self._fit_values, self._fit_errors
def Failed(self):
return self._failed
## PREFIT
def GetHistAndPeaks(self,
data,
bw,
peak_dist_factor,
peak_width_factor,
peak_ppf,
peak_prominence,
bin_method,
n_bootstrap,
n_bootstrap_kde,
bandwidth_kde,
kernel_kde,
alpha
):
self.scaler = RobustScaler()
data_s = np.squeeze(self.scaler.fit_transform(data.reshape(-1,1)))
if(bw is None):
ppf = self.EmpiricalPPF(data_s)
if(bin_method == "knuth"):
bw_s = knuth_bin_width(data_s[data_s<ppf(alpha)])
elif(bin_method=="freedman"):
bw_s = freedman_bin_width(data_s[data_s<ppf(alpha)])
elif(bin_method=="scott"):
bw_s = scott_bin_width(data_s[data_s<ppf(alpha)])
else:
raise Exception ("Please specify a bin width by setting 'bw' or choose a method from 'knuth', 'freedman' or 'scott'")
nbins_s = np.ceil((data_s.max() - data_s.min()) / bw_s)
nbins_s = max(1, nbins_s)
bins_s = data_s.min() + bw_s * np.arange(nbins_s + 1)
bw = bw_s*self.scaler.scale_[0]
nbins = np.ceil((data.max() - data.min()) / bw)
nbins = max(1, nbins)
bins = data.min() + bw * np.arange(nbins + 1)
else:
bw_s = bw/self.scaler.scale_[0]
nbins = np.ceil((data.max() - data.min()) / bw)
nbins = max(1, nbins)
bins = data.min() + bw * np.arange(nbins + 1)
nbins_s = np.ceil((data_s.max() - data_s.min()) / bw_s)
nbins_s = max(1, nbins_s)
bins_s = data_s.min() + bw_s * np.arange(nbins_s + 1)
else:
bw_s = bw/self.scaler.scale_[0]
nbins = np.ceil((data.max() - data.min()) / bw)
nbins = max(1, nbins)
bins = data.min() + bw * np.arange(nbins + 1)
nbins_s = np.ceil((data_s.max() - data_s.min()) / bw_s)
nbins_s = max(1, nbins_s)
bins_s = data_s.min() + bw_s * np.arange(nbins_s + 1)
if(nbins<50):
print("Binning with bw = {:3.3E} produced only {:d} bins, less than limit of 50 bins. Setting fit fail status and continuing...".format(bw, nbins))
self._failed = True
return
#Calculate histogram information
y_N, x = np.histogram(data, bins=bins, density=False)
N = np.sum(y_N)
y = y_N/N/bw
y_err = y*np.sqrt(1/y_N + 1/N)
x = (x[:-1] + x[1:])/2.
y_N_s, x_s = np.histogram(data_s, bins=bins_s, density=False)
N_s = np.sum(y_N_s)
y_s = y_N_s/N_s/bw_s
y_err_s = y_s*np.sqrt(1/y_N_s + 1/N_s)
x_s = (x_s[:-1] + x_s[1:])/2.
try:
x_kde_s, y_kde_s, y_kde_err_s, _, = self.BootstrapKDE(data_s,
kernel=kernel_kde,
bw=bandwidth_kde,
n_samples=n_bootstrap_kde,
alpha=alpha
)
f_lin_kde_s = interp1d(x_kde_s, np.arange(len(x_kde_s)), fill_value='extrapolate')
except:
print("Initial KDE failed. Setting fit fail status and continuing...")
self._failed = True
return
est_gain_s = self.FFTGain(x_kde_s, y_kde_s)
est_gain_lin = abs(f_lin_kde_s(est_gain_s) - f_lin_kde_s(0))
peaks, properties = find_peaks(y_kde_s,
distance=peak_dist_factor*est_gain_lin,
prominence=peak_prominence
)
if(len(peaks)<3):
print("Not enough peaks found with current distance criterion to fit. Setting fit fail status and continuing..")
self._failed=True
return
temp_widths = np.asarray(peak_widths(y_kde_s, peaks, rel_height=peak_width_factor))
temp_widths[2:] = x_kde_s[temp_widths[2:].astype(int)]
x_peaks_s = x_kde_s[peaks]
y_peaks_s = y_kde_s[peaks]
y_peaks_err_s = y_kde_err_s[peaks]
x_widths_s = temp_widths[3] - temp_widths[2]
pedestal_s = x_kde_s[peaks][0]
pedestal = pedestal_s*self.scaler.scale_[0] + self.scaler.center_[0]
est_gain = est_gain_s*self.scaler.scale_[0]
# cond_ped_kde_s = (x_kde_s>pedestal_s - est_gain_s/2)
# cond_ped_s = (x_s>pedestal_s - est_gain_s/2)
# cond_ped = (x>pedestal - est_gain/2)
# cond_ped_peaks = (x_peaks_s>pedestal_s - est_gain_s/2)
# x_kde_s = x_kde_s[cond_ped_kde_s]
# y_kde_s = y_kde_s[cond_ped_kde_s]
# y_kde_err_s = y_kde_err_s[cond_ped_kde_s]
# x_s = x_s[cond_ped_s]
# y_s = y_s[cond_ped_s]
# y_err_s = y_err_s[cond_ped_s]
# x_peaks_s = x_peaks_s[cond_ped_peaks]
# y_peaks_s = y_peaks_s[cond_ped_peaks]
# y_peaks_err_s = y_peaks_err_s[cond_ped_peaks]
# x_widths_s = x_widths_s[cond_ped_peaks]
# x = x[cond_ped]
# y = y[cond_ped]
# y_err = y_err[cond_ped]
est_mu_s, std_err_mu_s, _ = self.Bootstrap(data_s-pedestal_s,
self.EstMu,
n_samples=n_bootstrap)
est_lbda_s, std_err_lbda_s, _ = self.Bootstrap(data_s - pedestal_s,
self.EstLbda,
n_samples=n_bootstrap)
_, std_err_gain_s, _ = self.Bootstrap(data_s - pedestal_s,
self.EstGain,
n_samples=n_bootstrap)
self._peak_data["x_peak_s"] = []
self._peak_data["y_peak_s"] = []
self._peak_data["y_peak_err_s"] = []
self._peak_data["n_peak_s"] = []
self._peak_data["x_width_s"] = []
if(peak_ppf is not None):
peak_threshold = max(3, self.GPoisPPF(peak_ppf, est_mu_s, est_lbda_s, self._max_n_peaks))
if(self._verbose):
print("For {:3.3}% of distribution, accepting all peaks below {:d}".format(peak_ppf*100.0, peak_threshold))
else:
peak_threshold = 0
print("Accepting all peaks.")
n_valid_peaks = 0
for x_i, (y_peak, y_peak_err, x_peak, x_width) in enumerate(zip(y_peaks_s, y_peaks_err_s, x_peaks_s, x_widths_s)):
data_peak_s = self.SelectRange(data_s, x_peak-x_width/2, x_peak+x_width/2)
N_evt_peak = len(data_peak_s)
if(x_i==0):
est_x_0_s, std_err_x_0_s, _ = self.Bootstrap(data_peak_s,
np.mean,
n_samples=n_bootstrap)
if(x_i<peak_threshold):
if(self._verbose):
print("Peak {:d} has {:d} events. Adding...".format(x_i, N_evt_peak))
self._peak_data["x_peak_s"].append(x_peak)
self._peak_data["y_peak_s"].append(y_peak)
self._peak_data["y_peak_err_s"].append(y_peak_err)
self._peak_data["n_peak_s"].append(x_i)
self._peak_data["x_width_s"].append(x_width)
n_valid_peaks+=1
self._peak_data["x_peak_s"] = np.asarray(self._peak_data["x_peak_s"])
self._peak_data["y_peak_s"] = np.asarray(self._peak_data["y_peak_s"])
self._peak_data["y_peak_err_s"] = np.asarray(self._peak_data["y_peak_err_s"])
self._peak_data["n_peak_s"] = np.asarray(self._peak_data["n_peak_s"])
self._peak_data["x_width_s"] = np.asarray(self._peak_data["x_width_s"])
self._GD_data["x"] = x
self._GD_data["y"] = y
self._GD_data["y_err"] = y_err
self._GD_data["bw"] = bw
self._GD_data["N"] = N
self._GD_data["nbins"] = nbins
self._GD_data["x_s"] = x_s
self._GD_data["y_s"] = y_s
self._GD_data["y_err_s"] = y_err_s
self._GD_data["x_kde_s"] = x_kde_s
self._GD_data["y_kde_s"] = y_kde_s
self._GD_data["y_kde_err_s"] = y_kde_err_s
self._GD_data["bw_s"] = bw_s
self._GD_data["N_s"] = N_s
self._GD_data["nbins_s"] = nbins_s
self._GD_data["fit_GP_mu"] = max(est_mu_s, self._eps)
self._GD_data["fit_GP_mu_err"] = std_err_mu_s
self._GD_data["fit_GP_lbda"] = min(1, max(est_lbda_s, self._eps))
self._GD_data["fit_GP_lbda_err"] = std_err_lbda_s
self._GD_data["fit_G"] = est_gain_s
self._GD_data["fit_G_err"] = std_err_gain_s
self._GD_data["fit_x_0"] = est_x_0_s
self._GD_data["fit_x_0_err"] = std_err_x_0_s
self._GD_data["scaler"] = self.scaler
def PreFitGenPoisToPeaks(self, ppf_mainfit, n_bootstrap):
########
#STEP 1
########
fit_lbda = self._GD_data["fit_GP_lbda"]
fit_dlbda = self._GD_data["fit_GP_lbda_err"]
fit_x_0 = self._GD_data["fit_x_0"]
fit_mu = self._GD_data["fit_GP_mu"]
fit_dmu = self._GD_data["fit_GP_mu_err"]
x_peak = self._peak_data["x_peak_s"]
n_peak = self._peak_data["n_peak_s"]
y_peak = self._peak_data["y_peak_s"]
y_peak_err = self._peak_data["y_peak_err_s"]
log_y_peak = self.Logify(y_peak)
dlog_y_peak = y_peak_err/y_peak
fit_A_estspec = gpoisson.logpmf(n_peak, fit_mu, fit_lbda)
fit_A = np.nanmean(log_y_peak - fit_A_estspec)
fit_dA = np.nanmean(2*dlog_y_peak)
fit_dict_pois = {
"A": fit_A,
"mu":fit_mu,
"lbda":fit_lbda,
}
fit_pois = Chi2Regression(
self.DummyLogGPois,
n_peak,
log_y_peak,
dlog_y_peak
)
minuit_pois = Minuit(fit_pois, **fit_dict_pois)
minuit_pois.errordef=Minuit.LIKELIHOOD
minuit_pois.strategy=2
minuit_pois.errors["mu"] = fit_dmu
minuit_pois.errors["lbda"] = fit_dlbda
minuit_pois.limits["lbda"] = (self._eps, 1-self._eps)
minuit_pois.limits["mu"] = (self._eps, None)
minuit_pois.migrad(ncall= self._n_call_minuit,
iterate =self._n_iterations_minuit)
minuit_pois.hesse()
if(not minuit_pois.valid):
if(self._verbose):
print('Minuit returned invalid for Generalised Poisson fit to peaks. Using basic estimated parameters instead...')
self._GD_data["fit_GP_mu"] = fit_mu
self._GD_data["fit_GP_mu_err"] = fit_dmu
self._GD_data["fit_GP_lbda"] = fit_lbda
self._GD_data["fit_GP_lbda_err"] = fit_dlbda
self._GD_data["fit_GP_A"] = fit_A
self._GD_data["fit_GP_A_err"] = fit_dA
else:
self._GD_data["fit_GP_mu"] = minuit_pois.values["mu"]
self._GD_data["fit_GP_mu_err"] = minuit_pois.errors["mu"]
self._GD_data["fit_GP_lbda"] = minuit_pois.values["lbda"]
self._GD_data["fit_GP_lbda_err"] = minuit_pois.errors["lbda"]
self._GD_data["fit_GP_A"] = minuit_pois.values["A"]
self._GD_data["fit_GP_A_err"] = minuit_pois.errors["A"]
k_low = 0
k_hi = self.GPoisPPF(ppf_mainfit,
self._GD_data["fit_GP_mu"],
self._GD_data["fit_GP_lbda"],
self._max_n_peaks
)
self._GD_data["fit_GP_k_low"] = k_low
self._GD_data["fit_GP_k_hi"] = k_hi
if(self._GD_data["fit_GP_k_hi"]<2):
if(self._verbose):
print("Too few primary Geiger peaks to fit. Artificially increasing max number of peaks from {:d} to 2...".format(self._GD_data["fit_GP_k_hi"]))
self._GD_data["fit_GP_k_hi"] = max(self._GD_data["fit_GP_k_hi"], 2)
elif(self._GD_data["fit_GP_k_hi"]>self._max_n_peaks):
if(self._verbose):
print("Too many primary Geiger peaks to fit. Artificially decreasing max number of peaks from {:d} to {:d}...".format(self._GD_data["fit_GP_k_hi"],
self._max_n_peaks))
self._GD_data["fit_GP_k_hi"] = min(self._GD_data["fit_GP_k_hi"], self._max_n_peaks)
def PreFitSigma(self, data, n_bootstrap):
peak_val = self._peak_data["n_peak_s"]
data_s = np.squeeze(self.scaler.transform(data.reshape(-1,1)))
self._peak_data["peak_var_s"]=[]
self._peak_data["peak_var_err_s"]=[]
cond_peak = np.ones(len(peak_val), bool)
for x_i, y_peak, x_peak, x_width in zip(self._peak_data["n_peak_s"],
self._peak_data["y_peak_s"],
self._peak_data["x_peak_s"],
self._peak_data["x_width_s"]):
data_range = self.SelectRange(data_s, x_peak - x_width/2, x_peak + x_width/2)
try:
est_var, std_err_var, _ = self.Bootstrap(data_range,
np.var,
n_samples=n_bootstrap)
self._peak_data["peak_var_s"].append(est_var)
self._peak_data["peak_var_err_s"].append(std_err_var)
cond_peak[x_i]=True
except:
cond_peak[x_i]=False
self._peak_data["peak_var_s"].append(None)
self._peak_data["peak_var_err_s"].append(None)
if(sum(cond_peak)<3):
print("Fewer than 3 peaks had adequate statistics to determine peak width. Setting fit fail status...")
self._failed=True
return
self._peak_data["cond_valid_peak"] = cond_peak
self._peak_data["peak_var_s"] = np.asarray(self._peak_data["peak_var_s"])
self._peak_data["peak_var_err_s"] = np.asarray(self._peak_data["peak_var_err_s"])
n_peaks_select = self._peak_data["n_peak_s"][cond_peak]
peaks_var_select = self._peak_data["peak_var_s"][cond_peak]
peaks_var_err_select = self._peak_data["peak_var_err_s"][cond_peak]
fit_lin_var = HuberRegression(self.DummyLinear,
n_peaks_select,
peaks_var_select,
peaks_var_err_select
)
m_var_temp = np.median(np.gradient(peaks_var_select, n_peaks_select))
c_var_temp = peaks_var_select[np.argmin(peaks_var_select)]
fit_dict_lin_var = {
"m": m_var_temp,
"c": c_var_temp,
}
minuit_lin_var = Minuit(fit_lin_var, **fit_dict_lin_var)
minuit_lin_var.limits["m"] = (self._eps, None)
minuit_lin_var.strategy=2
minuit_lin_var.migrad(ncall= self._n_call_minuit,
iterate =self._n_iterations_minuit)
minuit_lin_var.hesse()
self._GD_data["fit_var0"] = minuit_lin_var.values["c"]
self._GD_data["fit_var0_err"] = minuit_lin_var.errors["c"]
self._GD_data["fit_var1"] = minuit_lin_var.values["m"]
self._GD_data["fit_var1_err"] = minuit_lin_var.errors["m"]
self._GD_data["fit_sigma0"] = np.sqrt(self._GD_data["fit_var0"])
self._GD_data["fit_sigma0_err"] = (0.5/self._GD_data["fit_sigma0"])*self._GD_data["fit_var0_err"]
self._GD_data["fit_sigma1"] = np.sqrt(self._GD_data["fit_var1"])
self._GD_data["fit_sigma1_err"] = (0.5/self._GD_data["fit_sigma1"])*self._GD_data["fit_var1_err"]
def PreFitDCR(self,
data,
tau,
tgate,
r_fast,
t_0,
ppf_mainfit,
kernel_kde,
bandwidth_kde,
n_bootstrap_kde,
dcr_lim=0.05):
G = self._GD_data["fit_G"]
mu = self._GD_data["fit_GP_mu"]
lbda = self._GD_data["fit_GP_lbda"]
x_0 = self._GD_data["fit_x_0"]
data_norm = (np.squeeze(self.scaler.transform(data.reshape(-1,1))) - x_0)/G
try:
x_dc_kde, y_dc_kde, y_dc_kde_err, _ = self.BootstrapKDE(
data_norm,
kernel=kernel_kde,
bw=bandwidth_kde,
n_samples=n_bootstrap_kde
)
except:
print("DCR KDE failed. Setting fit fail status and continuing...")
self._failed = True
return
cond_interpeak = (x_dc_kde>0) & (x_dc_kde<1)
x_dc_kde = x_dc_kde[cond_interpeak]
y_dc_kde = y_dc_kde[cond_interpeak]
y_dc_kde_err = y_dc_kde_err[cond_interpeak]
f_lin_kde_dc = interp1d(x_dc_kde, np.arange(0, len(x_dc_kde)), fill_value='extrapolate')
f_lin_kde_dc_inv = interp1d(np.arange(0, len(x_dc_kde)), x_dc_kde, fill_value='extrapolate')
f_log_kde_dc = interp1d(x_dc_kde, self.Logify(y_dc_kde), fill_value='extrapolate')
f_log_kde_upper_dc = interp1d(x_dc_kde, self.Logify(y_dc_kde + y_dc_kde_err), fill_value='extrapolate')
f_log_kde_lower_dc = interp1d(x_dc_kde, self.Logify(y_dc_kde - y_dc_kde_err), fill_value='extrapolate')
cond_inrange = (x_dc_kde>0) & (x_dc_kde<1)
x_dc_kde = x_dc_kde[cond_inrange]
y_dc_kde = y_dc_kde[cond_inrange]
y_dc_kde_err = y_dc_kde_err[cond_inrange]
x_dc_min = x_dc_kde[np.argmin(y_dc_kde)]
x_dc_lim_min_low = x_dc_min - dcr_lim
x_dc_lim_min_hi = x_dc_min + dcr_lim
x_dc_lim_low = 0
x_dc_lim_mid = x_dc_min
x_dc_lim_hi = f_lin_kde_dc_inv(f_lin_kde_dc(0) + 2*abs(f_lin_kde_dc(x_dc_min) - f_lin_kde_dc(0)))
cond_interpeak = (x_dc_kde> x_dc_lim_min_low) & (x_dc_kde<=x_dc_lim_min_hi)
cond_p_0toLM = (x_dc_kde> x_dc_lim_low) & (x_dc_kde<=x_dc_lim_mid)
cond_p_LMto2LM = (x_dc_kde> x_dc_lim_mid) & (x_dc_kde<=x_dc_lim_hi)
mean_p_interpeak = (quad(lambda x: np.exp(f_log_kde_dc(x)),
x_dc_lim_min_low,
x_dc_lim_min_hi)[0])/(2*dcr_lim)
mean_p_interpeak_upper = (quad(lambda x: np.exp(f_log_kde_upper_dc(x)),
x_dc_lim_min_low,
x_dc_lim_min_hi)[0])/(2*dcr_lim)
mean_p_interpeak_lower = (quad(lambda x: np.exp(f_log_kde_lower_dc(x)),
x_dc_lim_min_low,
x_dc_lim_min_hi)[0])/(2*dcr_lim)
dmean_p_interpeak = abs(mean_p_interpeak_upper - mean_p_interpeak_lower)
int_p_0toLM = quad(lambda x: np.exp(f_log_kde_dc(x)),
x_dc_lim_low,
x_dc_lim_mid)[0]
int_p_0toLM_upper = quad(lambda x: np.exp(f_log_kde_upper_dc(x)),
x_dc_lim_low,
x_dc_lim_mid)[0]
int_p_0toLM_lower = quad(lambda x: np.exp(f_log_kde_lower_dc(x)),
x_dc_lim_low,
x_dc_lim_mid)[0]
dint_p_0toLM = abs(int_p_0toLM_upper - int_p_0toLM_lower)
int_p_LMto2LM = quad(lambda x: np.exp(f_log_kde_dc(x)),
x_dc_lim_mid,
x_dc_lim_hi)[0]
int_p_LMto2LM_upper = quad(lambda x: np.exp(f_log_kde_upper_dc(x)),
x_dc_lim_mid,
x_dc_lim_hi)[0]
int_p_LMto2LM_lower = quad(lambda x: np.exp(f_log_kde_lower_dc(x)),
x_dc_lim_mid,
x_dc_lim_hi)[0]
dint_p_LMto2LM = abs(int_p_LMto2LM_upper - int_p_LMto2LM_lower)
int_p = int_p_0toLM + 0.5*int_p_LMto2LM
dint_p = np.sqrt(dint_p_0toLM**2 + 0.5*dint_p_LMto2LM**2)
DCR = mean_p_interpeak/(max(int_p, self._eps)*4*tau)
dDCR = DCR*np.sqrt((dmean_p_interpeak/max(mean_p_interpeak, self._eps))**2 + (dint_p/max(int_p, self._eps))**2)
mu_dcr= DCR*(abs(t_0) + tgate)
k_dcr_low = 0
k_dcr_hi = self.GPoisPPF(ppf_mainfit,
mu_dcr,
self._GD_data["fit_GP_lbda"],
self._max_n_dcr_peaks
)
self._DCR_data["x_dc_kde"] = x_dc_kde
self._DCR_data["y_dc_kde"] = y_dc_kde
self._DCR_data["y_dc_kde_err"] = y_dc_kde_err
self._DCR_data["cond_p_0toLM"] = cond_p_0toLM
self._DCR_data["cond_p_LMtoL2M"] = cond_p_LMto2LM
self._DCR_data["cond_interpeak"] = cond_interpeak
self._DCR_data["int_p_0toLM"] = int_p_0toLM
self._DCR_data["dint_p_0toLM"] = dint_p_0toLM
self._DCR_data["int_p_LMto2LM"] = int_p_LMto2LM
self._DCR_data["dint_p_LMto2LM"] = dint_p_LMto2LM
self._DCR_data["mean_p_interpeak"] = mean_p_interpeak
self._DCR_data["dmean_p_interpeak"] = dmean_p_interpeak
self._DCR_data["fit_tau"] = tau
self._DCR_data["fit_tgate"] = tgate
self._DCR_data["fit_t_0"] = t_0
self._DCR_data["fit_DCR"] = DCR
self._DCR_data["fit_DCR_err"] = dDCR
self._DCR_data["fit_dcr_k_low"] = k_dcr_low
self._DCR_data["fit_dcr_k_hi"] = k_dcr_hi
if(r_fast is None):
self._DCR_data["fit_r_fast"] = 0
else:
self._DCR_data["fit_r_fast"] = r_fast
if(self._DCR_data["fit_dcr_k_hi"]<2):
print("Too few dark peaks to fit. Artificially increasing max number of peaks from {:d} to 2...".format(self._DCR_data["fit_dcr_k_hi"]))
self._DCR_data["fit_dcr_k_hi"] = max(self._DCR_data["fit_dcr_k_hi"], 2)
elif(self._DCR_data["fit_dcr_k_hi"]>self._max_n_peaks):
print("Too many dark peaks to fit. Artificially decreasing max number of peaks from {:d} to {:d}...".format(self._DCR_data["fit_dcr_k_hi"],
self._max_n_peaks))
self._DCR_data["fit_dcr_k_hi"] = min(self._DCR_data["fit_dcr_k_hi"], self._max_n_dcr_peaks)
def InitFit(self,
data,
bw,
bin_method,
tau,
t_0,
r_fast,
tgate,
n_bootstrap,
n_bootstrap_kde,
kernel_kde,
bandwidth_kde,
peak_dist_factor,
peak_width_factor,
peak_ppf,
peak_prominence,
ppf_mainfit,
alpha
):
if(self._is_histogram):
data = np.array(list(chain.from_iterable([[x]*int(y) for x,y in zip(data[:,0], data[:,1])])))
if(not self._failed):
if(self._verbose):
print("Getting histogram and peak values...")
self.GetHistAndPeaks(data,
bw,
peak_dist_factor,
peak_width_factor,
peak_ppf,
peak_prominence,
bin_method,
n_bootstrap,
n_bootstrap_kde,
bandwidth_kde,
kernel_kde,
alpha)
if(not self._failed):
if(self._verbose):
print("Fitting Generalised Poisson distribution...")
self.PreFitGenPoisToPeaks(ppf_mainfit, n_bootstrap)
if(not self._failed):
if(self._verbose):
print("Fitting peak widths via bootstrap resampling...")
self.PreFitSigma(data, n_bootstrap)
if(not self._failed):
if(self._verbose):
print("Estimating DCR from interpeak region...")
self.PreFitDCR(data,
tau,
tgate,
r_fast,
t_0,
ppf_mainfit,
kernel_kde,
bandwidth_kde,
n_bootstrap_kde)
if(not self._failed):
if(self._verbose):
print("Prefitting complete.")
# FIT
def Fit(self,
data,
**kwargs):
kwargs_fit = self._default_fit_kwargs.copy()
kwargs_fit.update(kwargs)
if(not isinstance(data,np.ndarray)):
raise Exception("Data is in {0} format. Please provide data in the format of a NumPy array.".format(type(data)))
else:
if(data.ndim==1):
if(self._verbose):
print("Data is 1D. Assuming list of charges as input...")
self._is_histogram = False
elif(data.ndim==2):
if(self._verbose):
print("Data is 2D. Assuming histogram as input...")
self._is_histogram = True
else:
raise Exception("Please provide data in the format of either a list of charges (1D) or a histogram (2D)")
if(kwargs_fit["t_0"] is None):
raise Exception("t_0 is not set. Please set t_0 as a keyword argument to be the integration period before gate in nanoseconds.")
if(kwargs_fit["tgate"] is None):
raise Exception("tgate is not set. Please set tgate as a keyword argument to be the integration gate length in nanoseconds.")
if(kwargs_fit["tau"] is None):
raise Exception("tau is not set. Please set tau as a keyword argument to be the slow component of the SiPM pulse in nanoseconds.")
if(kwargs_fit["r_fast"] is None):
raise Exception("r_fast is not set. Please set r_fast as a keyword argument to be the fraction of the fast component of the SiPM pulse in nanoseconds.")
print("Performing fit initialisation...")
self.Init()
self.InitFit(data, **kwargs_fit)
if(not self._failed):
print("Fitting...")
self.N = len(data)
G = self._GD_data["fit_G"]
dG = self._GD_data["fit_G_err"]
mu = self._GD_data["fit_GP_mu"]
dmu = self._GD_data["fit_GP_mu_err"]
lbda = self._GD_data["fit_GP_lbda"]
dlbda = self._GD_data["fit_GP_lbda_err"]
k_low = self._GD_data["fit_GP_k_low"]
k_hi = self._GD_data["fit_GP_k_hi"]
x_0 = self._GD_data["fit_x_0"]
dx_0 = self._GD_data["fit_x_0_err"]
sigma_0 = self._GD_data["fit_sigma0"]
sigma_1 = self._GD_data["fit_sigma1"]
dsigma_0 = self._GD_data["fit_sigma0_err"]
dsigma_1 = self._GD_data["fit_sigma1_err"]
dx = self._GD_data["bw_s"]
tau = self._DCR_data["fit_tau"]
tgate = self._DCR_data["fit_tgate"]
t_0 = self._DCR_data["fit_t_0"]
DCR = self._DCR_data["fit_DCR"]
dDCR = self._DCR_data["fit_DCR_err"]
r_fast = self._DCR_data["fit_r_fast"]
k_dcr_low = self._DCR_data["fit_dcr_k_low"]
k_dcr_hi =self._DCR_data["fit_dcr_k_hi"]
# beta is set to G/2 good guess
self.fit_dict = {
"mu":mu,
"G":G,
"x_0": x_0,
"sigma_0": sigma_0,
"sigma_1": sigma_1,
"alpha": 0.2,
"r_beta": 0.2,
"lbda":lbda,
"tau": tau,
"tgate":tgate,
"t_0": t_0,
"DCR":DCR,
"r_fast":r_fast,
"k_low":k_low,
"k_hi":k_hi,
"k_dcr_low":k_dcr_low,
"k_dcr_hi":k_dcr_hi,
"dx": dx,
}
bl = BinnedLH(self.DRM, self._GD_data["x_s"], self._GD_data["y_s"])
minuit = Minuit(bl, **self.fit_dict)
minuit.errors["alpha"] = 0.01
minuit.errors["r_beta"] = 0.01
if(not np.isnan(dmu) and dmu is not None):
minuit.errors["mu"] = abs(dmu)
if(not np.isnan(dG) and dG is not None):
minuit.errors["G"] = abs(dG)
if(not np.isnan(dx_0) and dx_0 is not None):
minuit.errors["x_0"] = abs(dx_0)
if(not np.isnan(dlbda) and dlbda is not None):
minuit.errors["lbda"] = abs(dlbda)
if(not np.isnan(dsigma_0) and dsigma_0 is not None):
minuit.errors["sigma_0"] = abs(dsigma_0)
if(not np.isnan(dsigma_1) and dsigma_1 is not None):
minuit.errors["sigma_1"] = abs(dsigma_1)
if(not np.isnan(dDCR) and dDCR is not None):
minuit.errors["DCR"] = abs(dDCR)
minuit.limits["mu"] = (self._eps, max(1, k_hi))
minuit.limits["G"] = (self._eps, None)
minuit.limits["x_0"] = (None, None)
minuit.limits["alpha"] = (self._eps, 1-self._eps)
minuit.limits["r_beta"] = (self._eps, 1-self._eps)
minuit.limits["lbda"] = (self._eps, 1-self._eps)
minuit.limits["sigma_0"] = (self._eps, G)
minuit.limits["sigma_1"] = (self._eps, G)
minuit.limits["DCR"] = (self._eps, None)
minuit.fixed["k_low"] = True
minuit.fixed["k_hi"] = True
minuit.fixed["tgate"] = True
minuit.fixed["tau"] = True
minuit.fixed["t_0"] = True
minuit.fixed["r_fast"] = True
minuit.fixed["dx"] = True
minuit.fixed["k_dcr_low"] = True
minuit.fixed["k_dcr_hi"] = True
minuit.fixed["tgate"] = True
minuit.strategy=2
minuit.errordef=minuit.LIKELIHOOD
minuit.simplex()
minuit.migrad(ncall= self._n_call_minuit,
iterate =self._n_iterations_minuit)
minuit.hesse()
if(self._verbose):
print(minuit)
self._fit_minuit = minuit
self._fit_values_s = minuit.values.to_dict()
self._fit_errors_s = minuit.errors.to_dict()
self._fit_values = {}
self._fit_errors = {}
for key, value in self._fit_values_s.items():
if(key=="x_0"):
self._fit_values[key] = value*self.scaler.scale_[0] + self.scaler.center_[0]
elif(key=="G"):
self._fit_values[key] = value*self.scaler.scale_[0]
elif(key=="sigma_0"):
self._fit_values[key] = value*self.scaler.scale_[0]
elif(key=="sigma_1"):
self._fit_values[key] = value*self.scaler.scale_[0]
elif(key=="dx"):
self._fit_values[key] = self._GD_data["bw"]
else:
self._fit_values[key] = value
for key, value in self._fit_errors_s.items():
if(key=="x_0"):
self._fit_errors[key] = value*self.scaler.scale_[0] + self.scaler.center_[0]
elif(key=="G"):
self._fit_errors[key] = value*self.scaler.scale_[0]
elif(key=="sigma_0"):
self._fit_errors[key] = value*self.scaler.scale_[0]
elif(key=="sigma_1"):
self._fit_errors[key] = value*self.scaler.scale_[0]
elif(key=="dx"):
self._fit_errors[key] = 0.1*self._GD_data["bw"]
else:
self._fit_errors[key] = value
data = []
prms = []
for par in self._fit_minuit.parameters:
if(self._fit_minuit.fixed[par]):
continue
prms.append(par)
data.append([self._fit_values[par], self._fit_errors[par]])
data = np.asarray(data)
prms = np.asarray(prms)
_print = pd.DataFrame(columns = prms,
index=["Values", "Errors"], data = data.T).T
_print.style.format({
'Values': lambda val: f'{val:,.2E}',
'Errors': lambda val: f'{val:,.2E}'
})