Skip to content
Snippets Groups Projects
Commit 4a3c50c5 authored by Jack Christopher Hutchinson Rolph's avatar Jack Christopher Hutchinson Rolph
Browse files

Update PeakOTron.py

parent 90dd5feb
No related branches found
No related tags found
No related merge requests found
from joblib import dump, load ## code starts###
import os
import numpy as np
import pandas as pd
from itertools import chain
from iminuit import Minuit
from iminuit.util import describe
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib import cm
from numba import njit
from difflib import ndiff
import re
from KDEpy import FFTKDE
import scipy.special as sc
from scipy.integrate import quad
from scipy.stats import poisson, skew, norm, binom
from scipy.interpolate import interp1d
from scipy.signal import find_peaks, peak_widths
from scipy.signal import fftconvolve as convolve
from astropy.stats import knuth_bin_width, freedman_bin_width, scott_bin_width, bootstrap
from sklearn.preprocessing import RobustScaler
from AdditionalPDFs import gpoisson, borel
f_data = PeakOTron(verbose=True)
f_data.SetMaxNPeaks(20) #SET MAX N PEAKS - THRESHOLDING REQUIRED WHERE LAMBDA INCORRECTLY ESTIMATED, OR FIT WILL GO ON FOREVER.
f_data.SetMaxNDCRPeaks(6)
out_dict = {} class FakeFuncCode:
for _, row in scan_df.sort_values(by=["DCR"], ascending=True).iterrows(): def __init__(self, f, prmt=None, dock=0, append=None):
dcr_orig = row["DCR"] #f can either be tuple or function object
self.co_varnames = describe(f)
self.co_argcount = len(self.co_varnames)
self.co_argcount -= dock
self.co_varnames = self.co_varnames[dock:]
for i in range(20): if prmt is not None: #rename parameters from the front
data = np.random.choice(row["ChargeSpectrum"], size=20000) for i, p in enumerate(prmt):
self.co_varnames[i] = p
f_data.Fit(data, if isinstance(append, str): append = [append]
tau=20, #SLOW PULSE COMPONENT TIME CONSTANT (ns)
tgate=100, #GATE LENGTH (ns) if append is not None:
r_fast=0.1, #FRACTION OF FAST PULSE COMPONENT old_count = self.co_argcount
t_0 = 100, #INTEGRATION TIME BEFORE GATE (ns) self.co_argcount += len(append)
pedestal = 0.0, self.co_varnames = tuple(
prominence=0.0, list(self.co_varnames[:old_count]) +
peak_dist_factor=0.6, append +
min_n_peak_evts=25, list(self.co_varnames[old_count:]))
bandwidth_kde="optimise",
bw=0.05
class BandWidthOptimiser:
#Hobbema, Hermans, and Van den Broeck (1971) and by Duin (1976)
#leave-one-out cross-validation approach
#https://cran.r-project.org/web/packages/kedd/vignettes/kedd.pdf
def _Dummy(self, _, bw):
return bw
def _KDE(self, bw):
try:
x_kde, y_kde = FFTKDE(kernel = self.kernel, bw=bw).fit(self.data).evaluate(self.n_kde_samples)
loss = -np.sum(np.log(y_kde))
except:
loss = self.eps_inv
return loss
def _PPF(self, data):
""" Compute ECDF """
x = np.sort(data)
n = x.size
y = np.arange(1, n+1) / n
_ppf = interp1d(y, x)
return _ppf
def __init__(self, data, kernel="gaussian", alpha = 0.95, n_kde_samples=2**13):
if(alpha<=0.5):
raise ValueError("alpha must be above 0.5")
self.data = data
self.PPF = self._PPF(self.data)
self.data = self.data[(self.data<self.PPF(alpha))]
self.N = len(data)
self.kernel=kernel
self.n_kde_samples=n_kde_samples
self.last_arg = None
self.func_code = FakeFuncCode(self._Dummy, dock=True)
self.eps = np.finfo(np.float64).eps * 10
self.eps_inv = 1/self.eps
def __call__(self, *arg):
self.last_arg = arg
loss = self._KDE(*arg)
return loss
class BinnedLH:
def __init__(self, f, x, y):
self.f = f
self.x = x
self.y = y
self.last_arg = None
self.func_code = FakeFuncCode(f, dock=True)
self.ndof = len(self.y) - (self.func_code.co_argcount - 1)
self.eps = np.finfo(np.float64).eps * 10
self.eps_inv = 1/self.eps
self.log_eps = np.log(self.eps)
def Logify(self, y):
return np.where(y>self.eps, np.log(y), self.log_eps)
def __call__(self, *arg):
self.last_arg = arg
y_hat = self.f(self.x, *arg)
# y_hat = np.nan_to_num(y_hat, nan=self.eps_inv)
logL = self.y*(self.Logify(y_hat) - self.Logify(self.y)) + (self.y - y_hat)
nlogL = -np.sum(logL)
return nlogL
class Chi2Regression:
def __init__(self, f, x, y, y_err, epsilon=1.35):
self.f = f
self.x = x
self.y = y
self.y_err = y_err
self.eps = np.finfo(np.float64).eps * 10
self.y_err[self.y_err<self.eps] = self.eps
self.last_arg = None
self.func_code = FakeFuncCode(f, dock=True)
self.ndof = len(self.y) - (self.func_code.co_argcount - 1)
def __call__(self, *arg):
self.last_arg = arg
loss = ((self.f(self.x, *arg) - self.y)/(self.y_err))**2
return np.sum(loss)
class HuberRegression:
def __init__(self, f, x, y, y_err, delta=1.35):
self.f = f
self.x = x
self.y = y
self.y_err = y_err
self.delta = delta
self.eps = np.finfo(np.float64).eps * 10
self.y_err[self.y_err<self.eps] = self.eps
self.last_arg = None
self.func_code = FakeFuncCode(f, dock=True)
self.ndof = len(self.y) - (self.func_code.co_argcount - 1)
def __call__(self, *arg):
self.last_arg = arg
a = abs((self.y - self.f(self.x, *arg))/self.y_err)
cond_flag = (a>self.delta)
loss = np.sum((~cond_flag) * (0.5 * a ** 2) - (cond_flag) * self.delta * (0.5 * self.delta - a), -1)
return np.sum(loss)
@njit
def _SelectRangeNumba(array, low_lim, hi_lim):
index = (array >= low_lim) & (array <= hi_lim)
return array[index]
class PeakOTron:
def __init__(self,
verbose=False,
n_call_minuit=1000,
n_iterations_minuit = 10
):
self.Init()
self.greek_letters = ["Alpha",
"alpha",
"Beta",
"beta",
"Gamma",
"gamma",
"Delta",
"delta",
"Epsilon",
"epsilon",
"Zeta",
"zeta",
"Eta",
"eta",
"Theta",
"theta",
"Iota",
"iota",
"Kappa",
"kappa",
"Lbda",
"lbda",
"Mu",
"mu",
"Nu",
"nu",
"Xi",
"xi",
"Omicron",
"omicron",
"Pi",
"pi",
"Rho",
"rho",
"Sigma",
"sigma",
"Tau",
"tau",
"Upsilon",
"upsilon",
"Phi",
"phi",
"Chi",
"chi",
"Psi",
"psi",
"Omega",
"omega"]
self._eps = np.finfo(np.float64).eps * 10
self._eps_kde = 1e-6
self._FWHM2Sigma = 1/(2*np.sqrt(2*np.log(2)))
self._plot_figsize= (10,10)
self._plot_fontsize= 20
self._cmap = cm.get_cmap('viridis')
self._max_n_peaks = 12
self._max_n_dcr_peaks = 6
self._len_DCR_pad = int(100)
self._verbose=verbose
self._n_call_minuit = n_call_minuit
self._n_iterations_minuit = n_iterations_minuit
self._is_histogram = False
self._default_fit_kwargs={
"tau":None,
"t_0":None,
"r_fast":None,
"tgate":None,
"pedestal":None,
"n_bootstrap": 1000,
"n_bootstrap_kde":100,
"bw":None,
"peak_dist_factor":0.8,
"peak_width_factor":0.5,
"prominence": 0.0,
"min_n_peak_evts": 100,
"kernel_kde":"gaussian",
"bandwidth_kde":"ISJ",
"ppf_mainfit":1 - 1e-6,
"bin_method":"knuth"
}
def LevDist(self, str1, str2):
counter = {"+": 0, "-": 0}
distance = 0
for edit_code, *_ in ndiff(str1, str2):
if edit_code == " ":
distance += max(counter.values())
counter = {"+": 0, "-": 0}
else:
counter[edit_code] += 1
distance += max(counter.values())
return distance
###HELPER FUNCTIONS
def Init(self):
self._peak_data = {}
self._GD_data = {}
self._DCR_data={}
self._fit_values= {}
self._fit_errors = {}
self._fit = None
self._failed = False
def LatexFormat(self, f, scirange=[0.01,1000]):
if(np.abs(f)>scirange[0] and np.abs(f)<scirange[1]):
float_str = r"${:3.3f}$".format(f)
else:
try:
float_str = "{:3.3E}".format(f)
base, exponent = float_str.split("E")
float_str = r"${0} \times 10^{{{1}}}$".format(base, int(exponent))
except:
float_str=str(f)
return float_str
def SelectRange(self, array, low_lim, hi_lim):
return _SelectRangeNumba(array, low_lim, hi_lim)
def Logify(self, y):
log_y = np.log(y)
min_log_y = np.min(log_y[y>0])
return np.where(y>0, log_y, min_log_y)
def SetMaxNPeaks(self, max_n_peaks):
max_n_peaks = int(max_n_peaks)
if(max_n_peaks<1):
raise Exception("Maximum number of peaks must be greater than 2.")
self._max_n_peaks = max_n_peaks
print("Set maximum number of peaks to {:d}.".format(self._max_n_peaks))
def SetMaxNDCRPeaks(self, max_n_dcr_peaks):
max_n = int( max_n_dcr_peaks)
if(max_n_dcr_peaks<2):
raise Exception("Maximum number of peaks must be greater than 2.")
self._max_n_dcr_peaks = max_n_dcr_peaks
print("Set maximum number of dark peaks to {:d}.".format(self._max_n_dcr_peaks))
def DummyLinear(self, x, m, c):
return m*x + c
def DummyLogGPois(self, k, mu, lbda, A):
return A + gpoisson.logpmf(k, mu, lbda)
def SigmaK(self, k, sigma_0, sigma_1):
return np.sqrt(sigma_0**2 + k*sigma_1**2)
def PoisPPF(self, q, mu):
vals = np.ceil(sc.pdtrik(q, mu))
vals1 = np.maximum(vals - 1, 0)
temp = sc.pdtr(vals1, mu)
return np.where(temp >= q, vals1, vals)
def GPoisPPF(self, q, mu, lbda, k_max):
_sum = 0
_count = 0
while _count<=k_max:
_sum += gpoisson.pmf(_count, mu, lbda)
if(_sum<q):
_count+=1
else:
break
return _count
def EstLbda(self, data_sub_x_0):
_skew = skew(data_sub_x_0)
_mean = np.mean(data_sub_x_0)
_std = np.std(data_sub_x_0)
_lbda = 0.5*(((_mean*_skew)/(_std))- 1)
return _lbda
def EstGain(self, data_sub_x_0):
_skew = skew(data_sub_x_0)
_mean = np.mean(data_sub_x_0)
_std = np.std(data_sub_x_0)
_lbda = 0.5*(((_mean*_skew)/(_std))- 1)
_gain = (_std**2/_mean)*((1 - _lbda)**2)
return _gain
def EstMu(self, data_sub_x_0):
_skew = skew(data_sub_x_0)
_mean = np.mean(data_sub_x_0)
_std = np.std(data_sub_x_0)
_lbda = 0.5*(((_mean*_skew)/(_std))- 1)
_mu = (1/(1-_lbda))*(_mean**2/_std**2)
return _mu
def Bootstrap(self, data, statistic, alpha=0.95, n_samples=10000):
if not (0 < alpha < 1):
raise ValueError("confidence level must be in (0, 1)")
n = n_samples
if n <= 0:
raise ValueError("data must contain at least one measurement.")
boot_stat = bootstrap(data, n_samples, bootfunc=statistic)
stat_data = statistic(data)
mean_stat = np.mean(boot_stat)
est_stat = 2*stat_data - mean_stat
std_err = np.std(boot_stat)
z_score = np.sqrt(2.0)*sc.erfinv(alpha)
conf_interval = est_stat + z_score*np.array((-std_err, std_err))
return est_stat, std_err, conf_interval
def BootstrapKDE(self, data, kernel = "gaussian", bw="optimise",
n_kde_samples=2**13,
alpha=0.95,
n_samples=1000,
limits=None
):
if not (0 < alpha < 1):
raise ValueError("Bootstrap confidence level, alpha, must be in (0, 1).")
n = n_samples
if n <= 0:
raise ValueError("Bootstrap data must contain at least one measurement.")
kernels = list(FFTKDE._available_kernels.keys())
if kernel not in kernels:
raise ValueError("Selected KDE kernel not available. Please choose from these options: {:s}.".format(", ".join(kernels)))
if(not isinstance(bw, float)):
bws = list(FFTKDE._bw_methods.keys()) + ["optimise"]
if bw not in bws:
raise ValueError("Selected KDE bandwidth estimation method not available. Please choose from these options: {:s}.".format(", ".join(bws)))
if(bw=="optimise"):
try:
BWO= BandWidthOptimiser(data, kernel=kernel)
bw_scott = scott_bin_width(data)
minuit_kde = Minuit(BWO, bw=bw_scott/2)
minuit_kde.limits["bw"]=(self._eps_kde, 3*bw_scott)
minuit_kde.strategy=2
minuit_kde.migrad(ncall= self._n_call_minuit,
iterate =self._n_iterations_minuit)
bw = minuit_kde.values["bw"]
if(self._verbose):
print("KDE Bandwidth optimised to: {0}".format(bw))
except:
if(self._verbose):
print("KDE Bandwidth optimisation failed. Defaulting to Improved Sheather Jones.")
bw = "ISJ"
kde = FFTKDE(kernel = kernel, bw=bw).fit(data)
x_kde, y_kde_orig = kde.evaluate(n_kde_samples)
boot_data = bootstrap(data, n_samples)
y_kde_bs = np.vstack([FFTKDE(kernel = kernel, bw=bw).fit(_data).evaluate(x_kde)
for _data in boot_data])
y_kde_mean = np.mean(y_kde_bs, axis=0)
y_kde = 2*y_kde_orig - y_kde_mean
y_kde_err = np.std(y_kde_bs, axis=0)
z_score = np.sqrt(2.0)*sc.erfinv(alpha)
y_kde_conf_interval = y_kde + z_score*np.array((-y_kde_err, y_kde_err))
if(limits is not None):
cond_inrange =(x_kde>np.min(limits)) & (x_kde<np.max(limits))
x_kde = x_kde[cond_inrange]
y_kde = y_kde[cond_inrange]
y_kde_err = y_kde_err[cond_inrange]
return x_kde, y_kde, y_kde_err, y_kde_conf_interval
##MODEL DEFINITIONS
def DRM_basic(self, x, mu, x_0, G, sigma_0, sigma_1, alpha, r_beta, lbda, k_low, k_hi):
k = np.arange(k_low, k_hi)
k = sorted(list(k[k!=0]))
k_is = [k[0:int(_k)] for _k in k]
f0 = gpoisson.pmf(0, mu, lbda)*norm.pdf(x, x_0, sigma_0)
f1s = np.asarray([
binom.pmf(0, _k, alpha)*gpoisson.pmf(_k, mu, lbda)*norm.pdf(x, x_0 + _k*G, self.SigmaK(_k, sigma_0, sigma_1))
for _k in k
])
f2s = np.vstack([
np.asarray([
binom.pmf(_i, _k, alpha)*gpoisson.pmf(_k, mu, lbda)*self.XTM(x, _k, _i, x_0, G, sigma_0, sigma_1, r_beta)
for _i in _is
])
for _k, _is in zip(k, k_is)
])
f = np.sum(np.vstack([f0, f1s, f2s]), axis=0)
return f
def XTM(self, x, k, i, x_0, G, sigma_0, sigma_1, r_beta):
x_k = x - (x_0 + k*G)
_sigma_k = self.SigmaK(k, sigma_0, sigma_1)
x_ct_pad = np.zeros_like(x_k)
if(i==1):
cond = (x_k > -5*_sigma_k)
error_f = 0.5*(1.0 + sc.erf(x_k/(_sigma_k*np.sqrt(2.0))))
log_f = -x_k/(r_beta*G) - np.log(r_beta*G)
x_ct = np.where(cond, np.exp(log_f)*error_f, x_ct_pad)
elif(i>1):
cond = (x_k > 0)
log_f = sc.xlogy(i-1, x_k) - sc.gammaln(i) - sc.xlogy(i, r_beta*G) - x_k/(r_beta*G)
x_ct = np.where(cond, np.exp(log_f), x_ct_pad)
else:
x_ct = x_k
return x_ct
def PH_range(self, t, t_0, r_fast, tau, tgate):
if((t>t_0) and (t<0)):
PH_min = (1-r_fast)*np.exp(t_0/tau)*(1 - np.exp(-tgate/tau))
PH_max = (1-r_fast)*(1 - np.exp(-tgate/tau))
elif((t>0) and (t<tgate)):
PH_min = r_fast
PH_max = (1 - (1-r_fast)*np.exp(-tgate/tau))
else:
PH_min = 0
PH_max = 0
return PH_min, PH_max
def dpdPH(self, x, x_0, G, tau, tgate, t_0, r_fast):
PH_bfg_low, PH_bfg_hi = self.PH_range(-abs(t_0)/2, -abs(t_0), r_fast, tau, tgate)
PH_dg_low, PH_dg_hi = self.PH_range(tgate/2, -abs(t_0), r_fast, tau, tgate)
x_norm = (x-x_0)/(G)
PHs = np.zeros_like(x_norm)
cond_bfg = (x_norm>PH_bfg_low) & (x_norm<=PH_bfg_hi)
cond_dg = (x_norm>PH_dg_low) & (x_norm<=PH_dg_hi)
PHs[cond_bfg] += 1/x_norm[cond_bfg]
PHs[cond_dg] += 1/(1 - x_norm[cond_dg])
return PHs
def ApplyDCR(self, x, y_light, x_0, G, DCR, tgate, t_0, tau, r_fast, lbda, dx, k_dcr_low, k_dcr_hi):
mu_dcr = DCR*(abs(t_0) + tgate)
x_lin = np.arange(0, len(x))
x_pad_lin = np.arange(-self._len_DCR_pad, len(x)+self._len_DCR_pad)
f_lin = interp1d(x, x_lin, fill_value='extrapolate')
G_lin = f_lin(x_0+G) - f_lin(x_0)
x_0_lin = f_lin(x_0)
idx_orig = np.where(np.in1d(x_pad_lin,
np.intersect1d(
x_pad_lin,
x_lin
)
), True, False)
idx_x_0_lin = np.argmin(abs(x_pad_lin - x_0_lin))
n_dcr = np.arange(k_dcr_low, max(2, k_dcr_hi) )
fs = []
pfs = []
hs = []
phs = []
for _n_dcr in n_dcr:
if(_n_dcr==0):
f = np.zeros(len(x_pad_lin))
f[idx_x_0_lin] = 1
pf = poisson.pmf(0, mu_dcr)
h = np.zeros(len(x_pad_lin))
ph = 0
else:
if(len(fs)<=1):
f = self.dpdPH(x_pad_lin, x_0_lin, G_lin, tau, tgate, t_0, r_fast)
else:
f = convolve(fs[1], fs[-1])[idx_x_0_lin:len(x_pad_lin)+idx_x_0_lin]
f = f/ np.trapz(f, dx = 1)
pf = poisson.pmf(_n_dcr, mu_dcr)*(borel.pmf(0, lbda)**_n_dcr)
h = self.dpdPH(x_pad_lin, x_0_lin, G_lin*(_n_dcr+1), tau, tgate, t_0, r_fast)
h = h/ np.trapz(h, dx = 1)
ph = poisson.pmf(1, mu_dcr)*borel.pmf(_n_dcr, lbda)/(_n_dcr+1)
fs.append(f)
pfs.append(pf)
hs.append(h)
phs.append(ph)
fs = np.asarray(fs)
hs = np.asarray(hs)
pfs = np.expand_dims(np.asarray(pfs), -1)
phs = np.expand_dims(np.asarray(phs), -1)
y_dark = np.sum(fs*pfs, axis=0) + np.sum(hs*phs, axis=0)
y_dark = y_dark/np.trapz(h, dx = 1)
y_model = convolve(y_dark,
np.pad(y_light,
(self._len_DCR_pad, self._len_DCR_pad),
"constant",
constant_values=(0.0,0.0)),
)[idx_x_0_lin:len(x_pad_lin)+idx_x_0_lin]
y_model = y_model/np.trapz(y_model, dx = 1)/dx
y_model = y_model[idx_orig]
return y_model
def DRM_nodcr(
self,
x,
mu,
x_0,
G,
sigma_0,
sigma_1,
alpha,
r_beta,
lbda,
k_low,
k_hi):
y_model = self.DRM_basic(x,
mu,
x_0,
G,
sigma_0,
sigma_1,
alpha,
r_beta,
lbda,
k_low,
k_hi,
dx
)
return y_model
def DRM(
self,
x,
mu,
x_0,
G,
sigma_0,
sigma_1,
alpha,
r_beta,
lbda,
k_low,
k_hi,
k_dcr_low,
k_dcr_hi,
DCR,
tau,
tgate,
t_0,
r_fast,
dx
):
y_light = self.DRM_basic(x,
mu,
x_0,
G,
sigma_0,
sigma_1,
alpha,
r_beta,
lbda,
k_low,
k_hi
)
y_model = self.ApplyDCR(x,
y_light,
x_0,
G,
DCR,
tgate,
t_0,
tau,
r_fast,
lbda,
dx,
k_dcr_low,
k_dcr_hi)
return y_model
###PLOTTING
def PlotOriginalHistogram(self, ax):
ax.plot(self._GD_data["x_s"],
self._GD_data["y_s"],
lw=5,
label="Data")
ax.set_title("Normalised \n Input \n Distribution",fontsize=self._plot_fontsize)
ax.set_xlabel("IQR Normalised Charge Units [Q.U.]", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.set_yscale("log")
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
def PlotKDE(self, ax):
ax.plot(self._GD_data["x_s"],
self._GD_data["y_s"],
label="Data",
lw=5
)
ax.plot(self._GD_data["x_kde_s"],
self._GD_data["y_kde_s"],
lw=2.5,
label="KDE")
ax.set_title("Kernel Density Estimate", fontsize=self._plot_fontsize)
ax.set_xlabel("IQR Normalised Charge Units [Q.U.]", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.set_yscale("log")
ax.set_ylim([1e-5, None])
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
def PlotPeaks(self, ax):
ax.plot(self._GD_data["x_s"],
self._GD_data["y_s"],
label="Input Pulse \n Height \n Spectrum",
lw=5
)
color_vals = [self._cmap(_v) for _v in np.linspace(0,0.75, len(self._peak_data["x_peak_s"]))]
for _i_x, (_x, _x_w, _c_v) in enumerate(zip(
self._peak_data["x_peak_s"],
self._peak_data["x_width_s"],
color_vals
)):
_x_peak =_x
_x_width_low = _x - _x_w/2
_x_width_hi = _x + _x_w/2
_y_peak = self._GD_data["y_s"][np.argmin(abs(self._GD_data["x_s"]-_x_peak))]
_y_width_low = self._GD_data["y_s"][np.argmin(abs(self._GD_data["x_s"]-_x_width_low))]
_y_width_hi = self._GD_data["y_s"][np.argmin(abs(self._GD_data["x_s"]-_x_width_hi))]
if(_i_x == 0):
annotation_text = "Pedestal"
color = "red"
else:
annotation_text = "Peak {:d}".format(_i_x)
color = _c_v
ax.vlines(x=[_x],
ymin=0,
ymax=_y_peak,
color=color,
linestyle="-",
lw=2.5,
label = '\n{:s}: \n $x_{{pos}}$ = {:3.3f} Q.U \n $\Delta x_{{pos}}$ {:3.3f} Q.U \n'.format(
annotation_text,
_x_peak,
_x_width_hi - _x_width_low)
)
ax.vlines(x=[_x_width_low],
ymin=0,
ymax=_y_width_low,
color=color,
lw=2.5,
linestyle="--")
ax.vlines(x=[_x_width_hi],
ymin=0,
ymax=_y_width_hi,
color=color,
lw=2.5,
linestyle="--")
ax.axvline(x=[self._GD_data["limit_s"]],
color = "purple",
lw=5,
label="Pedestal Threshold ($PH_{{0}} - \\frac{G_{est}}{2}$)")
box = ax.get_position()
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
# Put a legend to the right of the current axis
ax.legend(loc='center left',
bbox_to_anchor=(1, 0.5),
fontsize=max(10, self._plot_fontsize//2),
ncol=len(self._peak_data["x_peak_s"])//6 + 1
)
ax.set_title("Peak Finding \n # Peaks = {:d}".format(len(self._peak_data["x_peak_s"])) ,
fontsize=self._plot_fontsize)
ax.set_xlabel("IQR Normalised Charge Units [Q.U.]", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.set_yscale("log")
def PlotGenPois(self, ax):
ax.scatter(self._peak_data["n_peak_s"],
self.Logify(self._peak_data["y_peak_s"]),
label="Extracted Peak Heights"
)
y_lgp = self.DummyLogGPois(self._peak_data["n_peak_s"],
self._GD_data["fit_GP_mu"],
self._GD_data["fit_GP_lbda"],
self._GD_data["fit_GP_A"]
)
ax.plot(self._peak_data["n_peak_s"],
y_lgp,
linestyle="--",
lw=2.5,
color="red",
label="Fit")
ax.set_title(
"Generalised Poisson Fit \n $\\mu$ = {0} $\pm$ {1} \n $\\lambda$ = {2} $\pm$ {3} \n $A$ = {4} $\pm$ {5}".format(
self.LatexFormat(self._GD_data["fit_GP_mu"]),
self.LatexFormat(self._GD_data["fit_GP_mu_err"]),
self.LatexFormat(self._GD_data["fit_GP_lbda"]),
self.LatexFormat(self._GD_data["fit_GP_lbda_err"]),
self.LatexFormat(self._GD_data["fit_GP_A"]),
self.LatexFormat(self._GD_data["fit_GP_A_err"])
), fontsize=self._plot_fontsize)
ax.set_xlabel("Peak", fontsize=self._plot_fontsize)
ax.set_ylabel("$\\log{\\frac{dP}{dPH}}$", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
def PlotSigmaEst(self, ax):
ax.set_title("Estimated Peak Variance \n $\\sigma^{{2}}_{{0}}$={0} $\pm$ {1} \n $\\sigma^{{2}}_{{1}}$={2} $\pm$ {3}".format(
self.LatexFormat(self._GD_data["fit_var0"]),
self.LatexFormat(self._GD_data["fit_var0_err"]),
self.LatexFormat(self._GD_data["fit_var1"]),
self.LatexFormat(self._GD_data["fit_var1_err"]),
), fontsize=self._plot_fontsize)
ax.errorbar(self._peak_data["n_peak_s"],
self._peak_data["peak_var_s"],
yerr=self._peak_data["peak_var_err_s"],
label="Extracted Variances",
fmt="o")
ax.plot(self._peak_data["n_peak_s"],
self.DummyLinear(self._peak_data["n_peak_s"],
self._GD_data["fit_var1"],
self._GD_data["fit_var0"]
),
linestyle="--",
lw=2.5,
color="red",
label="Fit")
ax.set_xlabel("Peak", fontsize=self._plot_fontsize)
ax.set_ylabel("$\sigma^{2}_{peak}$", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
def PlotGainEst(self, ax):
ax.set_title("Estimated Gain \n $G$={0} $\pm$ {1} \n $PH_{{0}}$={2} $\pm$ {3}".format(
self.LatexFormat(self._GD_data["fit_G"]),
self.LatexFormat(self._GD_data["fit_G_err"]),
self.LatexFormat(self._GD_data["fit_x_0"]),
self.LatexFormat(self._GD_data["fit_x_0_err"]),
), fontsize=self._plot_fontsize)
ax.errorbar(self._peak_data["n_peak_s"],
self._peak_data["peak_mean_s"],
yerr=self._peak_data["peak_mean_err_s"],
label="Extracted Peak Means",
fmt="o")
ax.plot(self._peak_data["n_peak_s"],
self.DummyLinear(self._peak_data["n_peak_s"],
self._GD_data["fit_G"],
self._GD_data["fit_x_0"]
),
linestyle="--",
lw=2.5,
color="red",
label="Fit")
ax.set_xlabel("Peak", fontsize=self._plot_fontsize)
ax.set_ylabel("$<Peak>$", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
def PlotDCREst(self, ax):
x_dc_kde = self._DCR_data["x_dc_kde"]
y_dc_kde = self._DCR_data["y_dc_kde"]
y_dc_kde_err = self._DCR_data["y_dc_kde_err"]
cond_p_0toLM = self._DCR_data["cond_p_0toLM"]
cond_p_LMto2LM = self._DCR_data["cond_p_LMtoL2M"]
cond_interpeak = self._DCR_data["cond_interpeak"]
int_p_0toLM = self._DCR_data["int_p_0toLM"]
dint_p_0toLM = self._DCR_data["dint_p_0toLM"]
int_p_LMto2LM = self._DCR_data["int_p_LMto2LM"]
dint_p_LMto2LM = self._DCR_data["dint_p_LMto2LM"]
mean_p_interpeak = self._DCR_data["mean_p_interpeak"]
dmean_p_interpeak = self._DCR_data["dmean_p_interpeak"]
ax.fill_between(
x_dc_kde[cond_p_0toLM],
y_dc_kde[cond_p_0toLM],
color='none',
hatch="///",
edgecolor="b",
label = r"$I_{{0 \rightarrow x_{{min}}}}$ = {:s} $\pm$ {:s}".format(self.LatexFormat(int_p_0toLM),
self.LatexFormat(dint_p_0toLM) )
)
ax.fill_between(
x_dc_kde[cond_p_LMto2LM],
y_dc_kde[cond_p_LMto2LM],
color='none',
hatch="///",
edgecolor="r",
label = r"$I_{{x_{{min}} \rightarrow 2x_{{min}}}}$ = {:s} $\pm$ {:s}".format(self.LatexFormat(int_p_LMto2LM),
self.LatexFormat(dint_p_LMto2LM) )
)
ax.fill_between(
x_dc_kde[cond_interpeak],
y_dc_kde[cond_interpeak],
color='none',
hatch="xxx",
edgecolor="g",
label = r"$<I_{{x_{{min}} - x_{{lim}} \rightarrow x_{{min}} + x_{{lim}} }}>$ = {:s} $\pm$ {:s}".format(
self.LatexFormat(mean_p_interpeak),
self.LatexFormat(dmean_p_interpeak) )
)
ax.plot(x_dc_kde,
y_dc_kde,
linestyle="-",
color="purple",
label="KDE, 95% Confidence"
)
ax.fill_between(x_dc_kde,
y1=y_dc_kde - 1.96*y_dc_kde_err,
y2=y_dc_kde + 1.96*y_dc_kde_err,
alpha=0.2,
color="purple",
label="KDE, 95% Confidence"
)
ax.set_title("Dark Count Rate \n DCR = {0} $\pm$ {1}".format(
self.LatexFormat(self._DCR_data["fit_DCR"]),
self.LatexFormat(self._DCR_data["fit_DCR_err"])),
fontsize=self._plot_fontsize)
ax.set_xlabel("$PH_{norm, est}$", fontsize=self._plot_fontsize)
ax.set_ylabel("$\\frac{dp_{DCR}}{PH_{norm, est}}$", fontsize=self._plot_fontsize)
ax.tick_params(axis="x", labelsize=self._plot_fontsize)
ax.tick_params(axis="y", labelsize=self._plot_fontsize)
ax.legend(fontsize=max(10, self._plot_fontsize//1.5))
ax.set_yscale("log")
def PlotFit(self,
figsize=(10.5,10.5),
labelsize=None,
ticksize=None,
titlesize=None,
legendsize=None,
title=None,
xlabel = None,
scaled=False,
save_directory=None,
y_limits = [None, None],
residual_limits = [-5, 5],
linewidth_main = 5,
x_limits = [None, None],
display=True
):
if(labelsize is None):
labelsize=self._plot_fontsize
if(ticksize is None):
ticksize=self._plot_fontsize
if(titlesize is None):
titlesize = self._plot_fontsize
if(legendsize is None):
legendsize = max(10, self._plot_fontsize//1.5)
if(scaled):
x = self._GD_data["x_s"]
y = self._GD_data["y_s"]
y_err = self._GD_data["y_err_s"]
y_hat = self.GetModelIQR(self._GD_data["x_s"])
if(xlabel is None):
xlabel = "IQR Normalised Charge Units [Q.U.]"
else:
x = self._GD_data["x"]
y = self._GD_data["y"]
y_err = self._GD_data["y_err"]
y_hat = self.GetModel(self._GD_data["x"])
if(xlabel is None):
xlabel = "Input Charge Units [Q.U.]"
fig = plt.figure(figsize=figsize)
gs = gridspec.GridSpec(2, 1, height_ratios=[2, 1])
ax0 = plt.subplot(gs[0])
ax0.plot(x, y, label="Data", lw=5, color="C0")
ax0.plot(x, y_hat, linestyle="--", label="Model", lw=5, color="C1")
ax0.set_yscale("log")
ax0.set_xlabel("$\\frac{dp}{dPH}$", fontsize=labelsize)
ax0.tick_params(axis="x", labelsize=ticksize)
ax0.tick_params(axis="y", labelsize=ticksize)
ax0.set_ylim(y_limits)
ax0.set_xlim(x_limits)
if(title is not None):
ax0.set_title(title, fontsize=titlesize)
ax0.legend(fontsize=legendsize)
ax1 = plt.subplot(gs[1], sharex = ax0)
ax1.scatter(x, (y - y_hat)/(y_err), color="C1")
ax1.tick_params(axis="x", labelsize=ticksize)
ax1.tick_params(axis="y", labelsize=ticksize)
ax1.set_xlabel(xlabel, fontsize=labelsize)
ax1.set_ylim(residual_limits)
ax1.axhline(y=-1.0, color="purple", lw=2.5, linestyle="--")
ax1.axhline(y=1.0, color="purple", lw=2.5, linestyle="--")
ax1.tick_params(axis="x", labelsize=self._plot_fontsize)
ax1.tick_params(axis="y", labelsize=self._plot_fontsize)
fig.tight_layout()
fig.subplots_adjust(hspace=.0)
plt.pause(0.01)
if(save_directory is not None):
print("Saving figure to {0}...".format(save_directory))
fig.savefig(save_directory)
if(display):
fig.show()
def PlotSummary(self, save_directory=None):
fig = plt.figure(figsize=(20,40))
gs = gridspec.GridSpec(4, 2)
gs.update(wspace=0.5, hspace=0.5)
ax0 = fig.add_subplot(gs[0,0])
self.PlotOriginalHistogram(ax0)
ax1 = fig.add_subplot(gs[0,1])
self.PlotKDE(ax1)
ax2 = fig.add_subplot(gs[1,:])
self.PlotPeaks(ax2)
ax3 = fig.add_subplot(gs[2,0])
self.PlotGenPois(ax3)
ax4 = fig.add_subplot(gs[2,1])
self.PlotSigmaEst(ax4)
ax5 = fig.add_subplot(gs[3,0])
self.PlotGainEst(ax5)
ax6 = fig.add_subplot(gs[3,1])
self.PlotDCREst(ax6)
if(save_directory is not None):
print("Saving figure to {0}...".format(save_directory))
fig.savefig(save_directory)
if(display):
fig.show()
def GetModel(self, x):
return self.DRM(x, **self._fit_values)
def GetModelIQR(self, x):
return self.DRM(x, **self._fit_values_s)
def GetOriginalDistribution(self):
return _self._GD_data["x"], self._GD_data["y"]
def GetFitResults(self):
return self._fit_values, self._fit_errors
def Failed(self):
return self._failed
## PREFIT
def GetHistAndPeaks(self,
data,
bw,
peak_dist_factor,
peak_width_factor,
pedestal,
prominence,
bin_method,
n_bootstrap,
n_bootstrap_kde,
bandwidth_kde,
kernel_kde,
min_n_peak_evts):
self.scaler = RobustScaler()
data_s = np.squeeze(self.scaler.fit_transform(data.reshape(-1,1)))
pedestal_s = (pedestal - self.scaler.center_[0])/self.scaler.scale_[0]
if(bw is None):
if(bin_method == "knuth"):
bw_s, bins_s = knuth_bin_width(data_s, return_bins=True)
elif(bin_method=="freedman"):
bw_s, bins_s = freedman_bin_width(data_s, return_bins=True)
elif(bin_method=="scott"):
bw_s, bins_s = scott_bin_width(data_s, return_bins=True)
else:
raise Exception ("Please specify a bin width by setting 'bw' or choose a method from 'knuth', 'freedman' or 'scott'")
nbins_s = len(bins_s)
bw = bw_s*self.scaler.scale_[0]
nbins = np.ceil((data.max() - data.min()) / bw)
nbins = max(1, nbins)
bins = data.min() + bw * np.arange(nbins + 1)
else:
bw_s = bw/self.scaler.scale_[0]
nbins = np.ceil((data.max() - data.min()) / bw)
nbins = max(1, nbins)
bins = data.min() + bw * np.arange(nbins + 1)
nbins_s = np.ceil((data_s.max() - data_s.min()) / bw_s)
nbins_s = max(1, nbins_s)
bins_s = data_s.min() + bw_s * np.arange(nbins_s + 1)
#Calculate histogram information
y_N, x = np.histogram(data, bins=bins, density=False)
N = np.sum(y_N)
y = y_N/N/bw
y_err = y*np.sqrt(1/y_N + 1/N)
x = (x[:-1] + x[1:])/2.
y_N_s, x_s = np.histogram(data_s, bins=bins_s, density=False)
N_s = np.sum(y_N_s)
y_s = y_N_s/N_s/bw_s
y_err_s = y_s*np.sqrt(1/y_N_s + 1/N_s)
x_s = (x_s[:-1] + x_s[1:])/2.
try:
x_kde_s, y_kde_s, y_kde_err_s, _, = self.BootstrapKDE(data_s,
kernel=kernel_kde,
bw=bandwidth_kde,
n_samples=n_bootstrap_kde)
f_lin_kde_s = interp1d(x_kde_s, np.arange(len(x_kde_s)), fill_value='extrapolate')
except:
print("Initial KDE failed. Setting fit fail status and continuing...")
self._failed = True
return
est_mu, std_err_mu, _ = self.Bootstrap(data_s-pedestal_s,
self.EstMu,
n_samples=n_bootstrap)
est_lbda, std_err_lbda, _ = self.Bootstrap(data_s - pedestal_s,
self.EstLbda,
n_samples=n_bootstrap)
est_gain, std_err_gain, _ = self.Bootstrap(data_s - pedestal_s,
self.EstGain,
n_samples=n_bootstrap)
est_gain_lin = abs(f_lin_kde_s(est_gain) - f_lin_kde_s(0))
peaks, properties = find_peaks(y_kde_s,
distance=peak_dist_factor*est_gain_lin,
prominence=prominence
)
if(len(peaks)<3):
if(self._verbose):
print("Not enough peaks found with distance criterion to fit. Removing distance criterion and finding all peaks...")
peaks, properties = find_peaks(y_kde_s,
prominence=prominence
)
limit_s = pedestal_s - 0.5*est_gain
limit = pedestal - 0.5*est_gain*self.scaler.scale_[0]
cond_limit_s = (x_kde_s[peaks]>limit_s)
if(sum(cond_limit_s)<3):
if(self._verbose):
print("No peaks observed above pedestal - gain/2. Continuing without thresholding...")
else:
peaks = peaks[cond_limit_s]
x_peaks_s = x_kde_s[peaks]
y_peaks_s = y_kde_s[peaks]
y_peaks_err_s = y_kde_err_s[peaks]
temp_widths = np.asarray(peak_widths(y_kde_s, peaks, rel_height=peak_width_factor))
temp_widths[2:] = x_kde_s[temp_widths[2:].astype(int)]
x_widths_s = temp_widths[3] - temp_widths[2]
self._peak_data["x_peak_s"] = []
self._peak_data["y_peak_s"] = []
self._peak_data["y_peak_err_s"] = []
self._peak_data["n_peak_s"] = []
self._peak_data["x_width_s"] = []
n_valid_peaks = 0
for x_i, (y_peak, y_peak_err, x_peak, x_width) in enumerate(zip(y_peaks_s, y_peaks_err_s, x_peaks_s, x_widths_s)):
N_evt_peak = len(self.SelectRange(data_s, x_peak-x_width/2, x_peak+x_width/2))
if(N_evt_peak<min_n_peak_evts):
if(self._verbose):
print("Peak {:d} has {:d} events. Less than event threshold {:d}. Ignoring...".format(x_i,
N_evt_peak,
min_n_peak_evts))
else:
if(self._verbose):
print("Peak {:d} has {:d} events. Adding...".format(x_i,
N_evt_peak,
min_n_peak_evts))
self._peak_data["x_peak_s"].append(x_peak)
self._peak_data["y_peak_s"].append(y_peak)
self._peak_data["y_peak_err_s"].append(y_peak_err)
self._peak_data["n_peak_s"].append(x_i)
self._peak_data["x_width_s"].append(x_width)
n_valid_peaks+=1
self._peak_data["x_peak_s"] = np.asarray(self._peak_data["x_peak_s"])
self._peak_data["y_peak_s"] = np.asarray(self._peak_data["y_peak_s"])
self._peak_data["y_peak_err_s"] = np.asarray(self._peak_data["y_peak_err_s"])
self._peak_data["n_peak_s"] = np.asarray(self._peak_data["n_peak_s"])
self._peak_data["x_width_s"] = np.asarray(self._peak_data["x_width_s"])
if(n_valid_peaks<3):
print("Minimum 3 peaks covering a threshold {0} events required. Too few events in peaks to proceed. Setting fit fail status and continuing...".format(min_n_peak_evts))
self._failed = True
return
elif(self._peak_data["n_peak_s"][0]>0):
if(self._verbose):
print("Estimated Pedestal had fewer than threshold {0} events. Shifting the pedestal forward...")
self._peak_data["n_peak_s"] = self._peak_data["n_peak_s"] - self._peak_data["n_peak_s"][0]
self._GD_data["x"] = x
self._GD_data["y"] = y
self._GD_data["y_err"] = y_err
self._GD_data["bw"] = bw
self._GD_data["N"] = N
self._GD_data["nbins"] = nbins
self._GD_data["x_s"] = x_s
self._GD_data["y_s"] = y_s
self._GD_data["y_err_s"] = y_err_s
self._GD_data["x_kde_s"] = x_kde_s
self._GD_data["y_kde_s"] = y_kde_s
self._GD_data["y_kde_err_s"] = y_kde_err_s
self._GD_data["bw_s"] = bw_s
self._GD_data["N_s"] = N_s
self._GD_data["nbins_s"] = nbins_s
self._GD_data["fit_GP_mu"] = max(est_mu, self._eps)
self._GD_data["fit_GP_mu_err"] = std_err_mu
self._GD_data["fit_GP_lbda"] = max(est_lbda, self._eps)
self._GD_data["fit_GP_lbda_err"] = std_err_lbda
self._GD_data["fit_G"] = est_gain
self._GD_data["fit_G_err"] = std_err_gain
self._GD_data["fit_x_0"] = self._peak_data["x_peak_s"][0]
self._GD_data["fit_x_0_err"] = self._peak_data["x_width_s"][0]
self._GD_data["scaler"] = self.scaler
self._GD_data["limit_s"] = limit_s
def PreFitGenPoisToPeaks(self, ppf_mainfit, n_bootstrap):
########
#STEP 1
########
fit_lbda = self._GD_data["fit_GP_lbda"]
fit_dlbda = self._GD_data["fit_GP_lbda_err"]
fit_x_0 = self._GD_data["fit_x_0"]
fit_mu = self._GD_data["fit_GP_mu"]
fit_dmu = self._GD_data["fit_GP_mu_err"]
x_peak = self._peak_data["x_peak_s"]
n_peak = self._peak_data["n_peak_s"]
y_peak = self._peak_data["y_peak_s"]
y_peak_err = self._peak_data["y_peak_err_s"]
log_y_peak = self.Logify(y_peak)
dlog_y_peak = y_peak_err/y_peak
fit_A_estspec = gpoisson.logpmf(n_peak, fit_mu, fit_lbda)
fit_A = np.nanmean(log_y_peak - fit_A_estspec)
fit_dA = np.nanmean(2*dlog_y_peak)
fit_dict_pois = {
"A": fit_A,
"mu":fit_mu,
"lbda":fit_lbda,
}
fit_pois = Chi2Regression(
self.DummyLogGPois,
n_peak,
log_y_peak,
dlog_y_peak
)
minuit_pois = Minuit(fit_pois, **fit_dict_pois)
minuit_pois.errordef=Minuit.LIKELIHOOD
minuit_pois.strategy=2
minuit_pois.errors["mu"] = fit_dmu
minuit_pois.errors["lbda"] = fit_dlbda
minuit_pois.limits["lbda"] = (self._eps, 1-self._eps)
minuit_pois.limits["mu"] = (self._eps, None)
minuit_pois.migrad(ncall= self._n_call_minuit,
iterate =self._n_iterations_minuit)
minuit_pois.hesse()
if(not minuit_pois.valid):
if(self._verbose):
print('Minuit returned invalid for Generalised Poisson fit to peaks. Using basic estimated parameters instead...')
self._GD_data["fit_GP_mu"] = fit_mu
self._GD_data["fit_GP_mu_err"] = fit_dmu
self._GD_data["fit_GP_lbda"] = fit_lbda
self._GD_data["fit_GP_lbda_err"] = fit_dlbda
self._GD_data["fit_GP_A"] = fit_A
self._GD_data["fit_GP_A_err"] = fit_dA
else:
self._GD_data["fit_GP_mu"] = minuit_pois.values["mu"]
self._GD_data["fit_GP_mu_err"] = minuit_pois.errors["mu"]
self._GD_data["fit_GP_lbda"] = minuit_pois.values["lbda"]
self._GD_data["fit_GP_lbda_err"] = minuit_pois.errors["lbda"]
self._GD_data["fit_GP_A"] = minuit_pois.values["A"]
self._GD_data["fit_GP_A_err"] = minuit_pois.errors["A"]
k_low = 0
k_hi = self.GPoisPPF(ppf_mainfit,
self._GD_data["fit_GP_mu"],
self._GD_data["fit_GP_lbda"],
self._max_n_peaks
)
self._GD_data["fit_GP_k_low"] = k_low
self._GD_data["fit_GP_k_hi"] = k_hi
if(self._GD_data["fit_GP_k_hi"]<2):
if(self._verbose):
print("Too few primary Geiger peaks to fit. Artificially increasing max number of peaks from {:d} to 2...".format(self._GD_data["fit_GP_k_hi"]))
self._GD_data["fit_GP_k_hi"] = max(self._GD_data["fit_GP_k_hi"], 2)
elif(self._GD_data["fit_GP_k_hi"]>self._max_n_peaks):
if(self._verbose):
print("Too many primary Geiger peaks to fit. Artificially decreasing max number of peaks from {:d} to {:d}...".format(self._GD_data["fit_GP_k_hi"],
self._max_n_peaks))
self._GD_data["fit_GP_k_hi"] = min(self._GD_data["fit_GP_k_hi"], self._max_n_peaks)
def PreFitSigmaAndGain(self, data, n_bootstrap):
peak_val = self._peak_data["n_peak_s"]
data_s = np.squeeze(self.scaler.transform(data.reshape(-1,1)))
self._peak_data["peak_mean_s"]=[]
self._peak_data["peak_mean_err_s"]=[]
self._peak_data["peak_var_s"]=[]
self._peak_data["peak_var_err_s"]=[]
for x_i, y_peak, x_peak, x_width in zip(self._peak_data["n_peak_s"],
self._peak_data["y_peak_s"],
self._peak_data["x_peak_s"],
self._peak_data["x_width_s"]):
data_range = self.SelectRange(data_s, x_peak - x_width/2, x_peak + x_width/2)
est_mean, std_err_mean, _ = self.Bootstrap(data_range,
np.mean,
n_samples=n_bootstrap)
est_var, std_err_var, _ = self.Bootstrap(data_range,
np.var,
n_samples=n_bootstrap)
self._peak_data["peak_mean_s"].append(est_mean)
self._peak_data["peak_mean_err_s"].append(std_err_mean)
self._peak_data["peak_var_s"].append(est_var)
self._peak_data["peak_var_err_s"].append(std_err_var)
self._peak_data["peak_mean_s"] = np.asarray(self._peak_data["peak_mean_s"])
self._peak_data["peak_mean_err_s"] = np.asarray(self._peak_data["peak_mean_err_s"])
self._peak_data["peak_var_s"] = np.asarray(self._peak_data["peak_var_s"])
self._peak_data["peak_var_err_s"] = np.asarray(self._peak_data["peak_var_err_s"])
n_peaks_select = self._peak_data["n_peak_s"]
peaks_var_select = self._peak_data["peak_var_s"]
peaks_var_err_select = self._peak_data["peak_var_err_s"]
peaks_mean_select = self._peak_data["peak_mean_s"]
peaks_mean_err_select = self._peak_data["peak_mean_err_s"]
fit_lin_var = HuberRegression(self.DummyLinear,
n_peaks_select,
peaks_var_select,
peaks_var_err_select
)
m_var_temp = np.median(np.gradient(peaks_var_select, n_peaks_select))
c_var_temp = peaks_var_select[np.argmin(peaks_var_select)]
fit_dict_lin_var = {
"m": m_var_temp,
"c": c_var_temp,
}
minuit_lin_var = Minuit(fit_lin_var, **fit_dict_lin_var)
minuit_lin_var.limits["m"] = (self._eps, None)
minuit_lin_var.strategy=2
minuit_lin_var.migrad(ncall= self._n_call_minuit,
iterate =self._n_iterations_minuit)
minuit_lin_var.hesse()
fit_lin_G = HuberRegression(self.DummyLinear,
n_peaks_select,
peaks_mean_select,
peaks_mean_err_select
)
m_G_temp = self._GD_data["fit_G"]
c_G_temp = self._GD_data["fit_x_0"]
fit_dict_lin_G = {
"m": m_G_temp,
"c": c_G_temp,
}
minuit_lin_G = Minuit(fit_lin_G, **fit_dict_lin_G)
minuit_lin_G.limits["m"] = (self._eps, None)
minuit_lin_G.strategy=2
minuit_lin_G.migrad(ncall= self._n_call_minuit,
iterate =self._n_iterations_minuit)
minuit_lin_G.hesse()
self._GD_data["fit_var0"] = minuit_lin_var.values["c"]
self._GD_data["fit_var0_err"] = minuit_lin_var.errors["c"]
self._GD_data["fit_var1"] = minuit_lin_var.values["m"]
self._GD_data["fit_var1_err"] = minuit_lin_var.errors["m"]
self._GD_data["fit_sigma0"] = np.sqrt(self._GD_data["fit_var0"])
self._GD_data["fit_sigma0_err"] = (0.5/self._GD_data["fit_sigma0"])*self._GD_data["fit_var0_err"]
self._GD_data["fit_sigma1"] = np.sqrt(self._GD_data["fit_var1"])
self._GD_data["fit_sigma1_err"] = (0.5/self._GD_data["fit_sigma1"])*self._GD_data["fit_var1_err"]
self._GD_data["fit_G"] = minuit_lin_G.values["m"]
self._GD_data["fit_G_err"] = minuit_lin_G.errors["m"]
self._GD_data["fit_x_0"] = minuit_lin_G.values["c"]
self._GD_data["fit_x_0_err"] = minuit_lin_G.errors["c"]
def PreFitDCR(self,
data,
tau,
tgate,
r_fast,
t_0,
ppf_mainfit,
kernel_kde,
bandwidth_kde,
n_bootstrap_kde,
dcr_lim=0.05):
G = self._GD_data["fit_G"]
mu = self._GD_data["fit_GP_mu"]
lbda = self._GD_data["fit_GP_lbda"]
x_0 = self._GD_data["fit_x_0"]
data_norm = (np.squeeze(self.scaler.transform(data.reshape(-1,1))) - x_0)/G
try:
x_dc_kde, y_dc_kde, y_dc_kde_err, _ = self.BootstrapKDE(
data_norm,
kernel=kernel_kde,
bw=bandwidth_kde,
n_samples=n_bootstrap_kde
)
except:
print("DCR KDE failed. Setting fit fail status and continuing...")
self._failed = True
return
cond_interpeak = (x_dc_kde>0) & (x_dc_kde<1)
x_dc_kde = x_dc_kde[cond_interpeak]
y_dc_kde = y_dc_kde[cond_interpeak]
y_dc_kde_err = y_dc_kde_err[cond_interpeak]
f_lin_kde_dc = interp1d(x_dc_kde, np.arange(0, len(x_dc_kde)), fill_value='extrapolate')
f_lin_kde_dc_inv = interp1d(np.arange(0, len(x_dc_kde)), x_dc_kde, fill_value='extrapolate')
f_log_kde_dc = interp1d(x_dc_kde, self.Logify(y_dc_kde), fill_value='extrapolate')
f_log_kde_upper_dc = interp1d(x_dc_kde, self.Logify(y_dc_kde + y_dc_kde_err), fill_value='extrapolate')
f_log_kde_lower_dc = interp1d(x_dc_kde, self.Logify(y_dc_kde - y_dc_kde_err), fill_value='extrapolate')
cond_inrange = (x_dc_kde>0) & (x_dc_kde<1)
x_dc_kde = x_dc_kde[cond_inrange]
y_dc_kde = y_dc_kde[cond_inrange]
y_dc_kde_err = y_dc_kde_err[cond_inrange]
x_dc_min = x_dc_kde[np.argmin(y_dc_kde)]
x_dc_lim_min_low = x_dc_min - dcr_lim
x_dc_lim_min_hi = x_dc_min + dcr_lim
x_dc_lim_low = 0
x_dc_lim_mid = x_dc_min
x_dc_lim_hi = f_lin_kde_dc_inv(f_lin_kde_dc(0) + 2*abs(f_lin_kde_dc(x_dc_min) - f_lin_kde_dc(0)))
cond_interpeak = (x_dc_kde> x_dc_lim_min_low) & (x_dc_kde<=x_dc_lim_min_hi)
cond_p_0toLM = (x_dc_kde> x_dc_lim_low) & (x_dc_kde<=x_dc_lim_mid)
cond_p_LMto2LM = (x_dc_kde> x_dc_lim_mid) & (x_dc_kde<=x_dc_lim_hi)
mean_p_interpeak = (quad(lambda x: np.exp(f_log_kde_dc(x)),
x_dc_lim_min_low,
x_dc_lim_min_hi)[0])/(2*dcr_lim)
mean_p_interpeak_upper = (quad(lambda x: np.exp(f_log_kde_upper_dc(x)),
x_dc_lim_min_low,
x_dc_lim_min_hi)[0])/(2*dcr_lim)
mean_p_interpeak_lower = (quad(lambda x: np.exp(f_log_kde_lower_dc(x)),
x_dc_lim_min_low,
x_dc_lim_min_hi)[0])/(2*dcr_lim)
dmean_p_interpeak = abs(mean_p_interpeak_upper - mean_p_interpeak_lower)
int_p_0toLM = quad(lambda x: np.exp(f_log_kde_dc(x)),
x_dc_lim_low,
x_dc_lim_mid)[0]
int_p_0toLM_upper = quad(lambda x: np.exp(f_log_kde_upper_dc(x)),
x_dc_lim_low,
x_dc_lim_mid)[0]
int_p_0toLM_lower = quad(lambda x: np.exp(f_log_kde_lower_dc(x)),
x_dc_lim_low,
x_dc_lim_mid)[0]
dint_p_0toLM = abs(int_p_0toLM_upper - int_p_0toLM_lower)
int_p_LMto2LM = quad(lambda x: np.exp(f_log_kde_dc(x)),
x_dc_lim_mid,
x_dc_lim_hi)[0]
int_p_LMto2LM_upper = quad(lambda x: np.exp(f_log_kde_upper_dc(x)),
x_dc_lim_mid,
x_dc_lim_hi)[0]
int_p_LMto2LM_lower = quad(lambda x: np.exp(f_log_kde_lower_dc(x)),
x_dc_lim_mid,
x_dc_lim_hi)[0]
dint_p_LMto2LM = abs(int_p_LMto2LM_upper - int_p_LMto2LM_lower)
int_p = int_p_0toLM + 0.5*int_p_LMto2LM
dint_p = np.sqrt(dint_p_0toLM**2 + 0.5*dint_p_LMto2LM**2)
DCR = mean_p_interpeak/(max(int_p, self._eps)*4*tau)
dDCR = DCR*np.sqrt((dmean_p_interpeak/max(mean_p_interpeak, self._eps))**2 + (dint_p/max(int_p, self._eps))**2)
mu_dcr= DCR*(abs(t_0) + tgate)
k_dcr_low = 0
k_dcr_hi = self.GPoisPPF(ppf_mainfit,
mu_dcr,
self._GD_data["fit_GP_lbda"],
self._max_n_dcr_peaks
) )
if(not f_data.Failed()):
sd_init = "/beegfs/desy/user/rolphjac/SIPM/Validation/DCRScan/DCR{:3.3E}_{:d}_init.png".format(dcr_orig, i)
sd_fit = "/beegfs/desy/user/rolphjac/SIPM/Validation/DCRScan/DCR{:3.3E}_{:d}_fit.png".format(dcr_orig, i)
sd_dump = "/beegfs/desy/user/rolphjac/SIPM/Validation/DCRScan/DCR{:3.3E}_{:d}_dump.joblib".format(dcr_orig, i)
f_data.PlotSummary(save_directory=sd_init)
f_data.PlotFit(scaled=False, save_directory=sd_fit, title="$DCR$={:3.3E} GHz".format(row["DCR"]), xlabel="Input Charge Units [Q.U.]")
#save_directory="./SiPMSpectra2/woTile/{0}_fit.png".format(os.path.splitext(file)[0]))
dump(f_data, sd_dump)
self._DCR_data["x_dc_kde"] = x_dc_kde
self._DCR_data["y_dc_kde"] = y_dc_kde
self._DCR_data["y_dc_kde_err"] = y_dc_kde_err
self._DCR_data["cond_p_0toLM"] = cond_p_0toLM
self._DCR_data["cond_p_LMtoL2M"] = cond_p_LMto2LM
self._DCR_data["cond_interpeak"] = cond_interpeak
self._DCR_data["int_p_0toLM"] = int_p_0toLM
self._DCR_data["dint_p_0toLM"] = dint_p_0toLM
self._DCR_data["int_p_LMto2LM"] = int_p_LMto2LM
self._DCR_data["dint_p_LMto2LM"] = dint_p_LMto2LM
self._DCR_data["mean_p_interpeak"] = mean_p_interpeak
self._DCR_data["dmean_p_interpeak"] = dmean_p_interpeak
self._DCR_data["fit_tau"] = tau
self._DCR_data["fit_tgate"] = tgate
self._DCR_data["fit_t_0"] = t_0
self._DCR_data["fit_DCR"] = DCR
self._DCR_data["fit_DCR_err"] = dDCR
self._DCR_data["fit_dcr_k_low"] = k_dcr_low
self._DCR_data["fit_dcr_k_hi"] = k_dcr_hi
if(r_fast is None):
self._DCR_data["fit_r_fast"] = 0
else:
self._DCR_data["fit_r_fast"] = r_fast
if(self._DCR_data["fit_dcr_k_hi"]<2):
print("Too few dark peaks to fit. Artificially increasing max number of peaks from {:d} to 2...".format(self._DCR_data["fit_dcr_k_hi"]))
self._DCR_data["fit_dcr_k_hi"] = max(self._DCR_data["fit_dcr_k_hi"], 2)
elif(self._DCR_data["fit_dcr_k_hi"]>self._max_n_peaks):
print("Too many dark peaks to fit. Artificially decreasing max number of peaks from {:d} to {:d}...".format(self._DCR_data["fit_dcr_k_hi"],
self._max_n_peaks))
self._DCR_data["fit_dcr_k_hi"] = min(self._DCR_data["fit_dcr_k_hi"], self._max_n_dcr_peaks)
def InitFit(self,
data,
tau,
t_0,
r_fast,
tgate,
pedestal,
n_bootstrap,
n_bootstrap_kde,
bw,
peak_dist_factor,
peak_width_factor,
prominence,
min_n_peak_evts,
kernel_kde,
bandwidth_kde,
ppf_mainfit,
bin_method
):
if(self._is_histogram):
data = np.array(list(chain.from_iterable([[x]*int(y) for x,y in zip(data[:,0], data[:,1])])))
if(not self._failed):
if(self._verbose):
print("Getting histogram and peak values...")
self.GetHistAndPeaks(
data,
bw,
peak_dist_factor,
peak_width_factor,
pedestal,
prominence,
bin_method,
n_bootstrap,
n_bootstrap_kde,
bandwidth_kde,
kernel_kde,
min_n_peak_evts)
if(not self._failed):
if(self._verbose):
print("Fitting Generalised Poisson distribution...")
self.PreFitGenPoisToPeaks(ppf_mainfit, n_bootstrap)
if(not self._failed):
if(self._verbose):
print("Fitting peak widths via bootstrap resampling...")
self.PreFitSigmaAndGain(data, n_bootstrap)
if(not self._failed):
if(self._verbose):
print("Estimating DCR from interpeak region...")
self.PreFitDCR(data,
tau,
tgate,
r_fast,
t_0,
ppf_mainfit,
kernel_kde,
bandwidth_kde,
n_bootstrap_kde)
if(not self._failed):
if(self._verbose):
print("Prefitting complete.")
# FIT
def Fit(self,
data,
**kwargs):
kwargs_fit = self._default_fit_kwargs.copy()
kwargs_fit.update(kwargs)
if(not isinstance(data,np.ndarray)):
raise Exception("Data is in {0} format. Please provide data in the format of a NumPy array.".format(type(data)))
else:
if(data.ndim==1):
if(self._verbose):
print("Data is 1D. Assuming list of charges as input...")
self._is_histogram = False
elif(data.ndim==2):
if(self._verbose):
print("Data is 2D. Assuming histogram as input...")
self._is_histogram = True
else:
raise Exception("Please provide data in the format of either a list of charges (1D) or a histogram (2D)")
if(kwargs_fit["t_0"] is None):
raise Exception("t_0 is not set. Please set t_0 as a keyword argument to be the integration period before gate in nanoseconds.")
if(kwargs_fit["tgate"] is None):
raise Exception("tgate is not set. Please set tgate as a keyword argument to be the integration gate length in nanoseconds.")
if(kwargs_fit["tau"] is None):
raise Exception("tau is not set. Please set tau as a keyword argument to be the slow component of the SiPM pulse in nanoseconds.")
if(kwargs_fit["r_fast"] is None):
raise Exception("r_fast is not set. Please set r_fast as a keyword argument to be the fraction of the fast component of the SiPM pulse in nanoseconds.")
if(kwargs_fit["pedestal"] is None):
raise Exception("pedestal is not set. Please set pedestal to be the pedestal position in units of the input charge spectrum being fitted.")
print("Performing fit initialisation...")
self.Init()
self.InitFit(data, **kwargs_fit)
if(not self._failed):
print("Fitting...")
self.N = len(data)
G = self._GD_data["fit_G"]
dG = self._GD_data["fit_G_err"]
mu = self._GD_data["fit_GP_mu"]
dmu = self._GD_data["fit_GP_mu_err"]
lbda = self._GD_data["fit_GP_lbda"]
dlbda = self._GD_data["fit_GP_lbda_err"]
k_low = self._GD_data["fit_GP_k_low"]
k_hi = self._GD_data["fit_GP_k_hi"]
x_0 = self._GD_data["fit_x_0"]
dx_0 = self._GD_data["fit_x_0_err"]
sigma_0 = self._GD_data["fit_sigma0"]
sigma_1 = self._GD_data["fit_sigma1"]
dsigma_0 = self._GD_data["fit_sigma0_err"]
dsigma_1 = self._GD_data["fit_sigma1_err"]
dx = self._GD_data["bw_s"]
tau = self._DCR_data["fit_tau"]
tgate = self._DCR_data["fit_tgate"]
t_0 = self._DCR_data["fit_t_0"]
DCR = self._DCR_data["fit_DCR"]
dDCR = self._DCR_data["fit_DCR_err"]
r_fast = self._DCR_data["fit_r_fast"]
k_dcr_low = self._DCR_data["fit_dcr_k_low"]
k_dcr_hi =self._DCR_data["fit_dcr_k_hi"]
# beta is set to G/2 good guess
self.fit_dict = {
"mu":mu,
"G":G,
"x_0": x_0,
"sigma_0": sigma_0,
"sigma_1": sigma_1,
"alpha": 0.2,
"r_beta": 0.2,
"lbda":lbda,
"tau": tau,
"tgate":tgate,
"t_0": t_0,
"DCR":DCR,
"r_fast":r_fast,
"k_low":k_low,
"k_hi":k_hi,
"k_dcr_low":k_dcr_low,
"k_dcr_hi":k_dcr_hi,
"dx": dx,
}
bl = BinnedLH(self.DRM, self._GD_data["x_s"], self._GD_data["y_s"])
minuit = Minuit(bl, **self.fit_dict)
minuit.errors["alpha"] = 0.01
minuit.errors["r_beta"] = 0.01
if(not np.isnan(dmu) and dmu is not None):
minuit.errors["mu"] = abs(dmu)
if(not np.isnan(dG) and dG is not None):
minuit.errors["G"] = abs(dG)
if(not np.isnan(dx_0) and dx_0 is not None):
minuit.errors["x_0"] = abs(dx_0)
if(not np.isnan(dlbda) and dlbda is not None):
minuit.errors["lbda"] = abs(dlbda)
if(not np.isnan(dsigma_0) and dsigma_0 is not None):
minuit.errors["sigma_0"] = abs(dsigma_0)
if(not np.isnan(dsigma_1) and dsigma_1 is not None):
minuit.errors["sigma_1"] = abs(dsigma_1)
if(not np.isnan(dDCR) and dDCR is not None):
minuit.errors["DCR"] = abs(dDCR)
minuit.limits["mu"] = (self._eps, max(1, k_hi))
minuit.limits["G"] = (self._eps, None)
minuit.limits["x_0"] = (x_0-G/2, x_0+G/2)
minuit.limits["alpha"] = (self._eps, 1-self._eps)
minuit.limits["r_beta"] = (self._eps, 1-self._eps)
minuit.limits["lbda"] = (self._eps, 1-self._eps)
minuit.limits["sigma_0"] = (self._eps, G)
minuit.limits["sigma_1"] = (self._eps, G)
minuit.limits["DCR"] = (self._eps, None)
minuit.fixed["k_low"] = True
minuit.fixed["k_hi"] = True
minuit.fixed["tgate"] = True
minuit.fixed["tau"] = True
minuit.fixed["t_0"] = True
minuit.fixed["r_fast"] = True
minuit.fixed["dx"] = True
minuit.fixed["k_dcr_low"] = True
minuit.fixed["k_dcr_hi"] = True
minuit.fixed["tgate"] = True
minuit.strategy=2
minuit.errordef=minuit.LIKELIHOOD
minuit.migrad(ncall= self._n_call_minuit,
iterate =self._n_iterations_minuit)
minuit.hesse()
if(self._verbose):
print(minuit)
self._fit_minuit = minuit
self._fit_values_s = minuit.values.to_dict()
self._fit_errors_s = minuit.errors.to_dict()
self._fit_values = {}
self._fit_errors = {}
for key, value in self._fit_values_s.items():
if(key=="x_0"):
self._fit_values[key] = value*self.scaler.scale_[0] + self.scaler.center_[0]
elif(key=="G"):
self._fit_values[key] = value*self.scaler.scale_[0]
elif(key=="sigma_0"):
self._fit_values[key] = value*self.scaler.scale_[0]
elif(key=="sigma_1"):
self._fit_values[key] = value*self.scaler.scale_[0]
elif(key=="dx"):
self._fit_values[key] = self._GD_data["bw"]
else:
self._fit_values[key] = value
for key, value in self._fit_errors_s.items():
if(key=="x_0"):
self._fit_errors[key] = value*self.scaler.scale_[0] + self.scaler.center_[0]
elif(key=="G"):
self._fit_errors[key] = value*self.scaler.scale_[0]
elif(key=="sigma_0"):
self._fit_errors[key] = value*self.scaler.scale_[0]
elif(key=="sigma_1"):
self._fit_errors[key] = value*self.scaler.scale_[0]
elif(key=="dx"):
self._fit_errors[key] = 0.1*self._GD_data["bw"]
else:
self._fit_errors[key] = value
fit_out = {}
fit_val, fit_err = f_data.GetFitResults()
print("DCR_ORIG={:3.3E} \t DCR_FIT = {:3.3E}".format(dcr_orig, fit_val["DCR"]))
fit_out["Index"] = i data = []
fit_out["dcr_orig"] = dcr_orig prms = []
for key, value in fit_err.items(): for par in self._fit_minuit.parameters:
fit_out["d_{:s}".format(key)] = value if(self._fit_minuit.fixed[par]):
continue
fit_out.update(fit_val) prms.append(par)
data.append([self._fit_values[par], self._fit_errors[par]])
if out_dict == {}: data = np.asarray(data)
out_dict["dcr_orig"]=[] prms = np.asarray(prms)
out_dict["Index"]=[]
for key in fit_out.keys():
out_dict[key] = []
for key in fit_out.keys(): _print = pd.DataFrame(columns = prms,
out_dict[key].append(fit_out[key]) index=["Values", "Errors"], data = data.T).T
_print.style.format({
'Values': lambda val: f'{val:,.2E}',
'Errors': lambda val: f'{val:,.2E}'
})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment