## code starts### import os import numpy as np import pandas as pd from itertools import chain from iminuit import Minuit from iminuit.util import describe import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec from matplotlib import cm from numba import njit from difflib import ndiff import re from KDEpy import FFTKDE import scipy.special as sc from scipy.integrate import quad from scipy.stats import poisson, skew, norm, binom from scipy.interpolate import interp1d from scipy.signal import find_peaks, peak_widths from scipy.signal import fftconvolve as convolve from astropy.stats import knuth_bin_width, freedman_bin_width, scott_bin_width, bootstrap from sklearn.preprocessing import RobustScaler from AdditionalPDFs import gpoisson, borel class FakeFuncCode: def __init__(self, f, prmt=None, dock=0, append=None): #f can either be tuple or function object self.co_varnames = describe(f) self.co_argcount = len(self.co_varnames) self.co_argcount -= dock self.co_varnames = self.co_varnames[dock:] if prmt is not None: #rename parameters from the front for i, p in enumerate(prmt): self.co_varnames[i] = p if isinstance(append, str): append = [append] if append is not None: old_count = self.co_argcount self.co_argcount += len(append) self.co_varnames = tuple( list(self.co_varnames[:old_count]) + append + list(self.co_varnames[old_count:])) class BandWidthOptimiser: #Hobbema, Hermans, and Van den Broeck (1971) and by Duin (1976) #leave-one-out cross-validation approach #https://cran.r-project.org/web/packages/kedd/vignettes/kedd.pdf def _Dummy(self, _, bw): return bw def _KDE(self, bw): try: x_kde, y_kde = FFTKDE(kernel = self.kernel, bw=bw).fit(self.data).evaluate(self.n_kde_samples) loss = np.log((self.N-1)*bw) - np.sum(np.log(y_kde)) return loss except: return np.nan def _PPF(self, data): """ Compute ECDF """ x = np.sort(data) n = x.size y = np.arange(1, n+1) / n _ppf = interp1d(y, x) return _ppf def __init__(self, data, kernel="gaussian", alpha = 0.95, n_kde_samples=2**13): if(alpha<=0.5): raise ValueError("alpha must be above 0.5") self.data = data self.PPF = self._PPF(self.data) self.data = self.data[(self.data<self.PPF(alpha))] self.N = len(data) self.kernel=kernel self.n_kde_samples=n_kde_samples self.last_arg = None self.func_code = FakeFuncCode(self._Dummy, dock=True) self.eps = np.finfo(np.float64).eps * 10 def __call__(self, *arg): self.last_arg = arg loss = self._KDE(*arg) return loss class BinnedLH: def __init__(self, f, x, y): self.f = f self.x = x self.y = y self.last_arg = None self.func_code = FakeFuncCode(f, dock=True) self.ndof = len(self.y) - (self.func_code.co_argcount - 1) self.eps = np.finfo(np.float64).eps * 10 self.eps_inv = 1/self.eps self.log_eps = np.log(self.eps) def Logify(self, y): return np.where(y>self.eps, np.log(y), self.log_eps) def __call__(self, *arg): self.last_arg = arg y_hat = self.f(self.x, *arg) # y_hat = np.nan_to_num(y_hat, nan=self.eps_inv) logL = self.y*(self.Logify(y_hat) - self.Logify(self.y)) + (self.y - y_hat) nlogL = -np.sum(logL) return nlogL class Chi2Regression: def __init__(self, f, x, y, y_err, epsilon=1.35): self.f = f self.x = x self.y = y self.y_err = y_err self.eps = np.finfo(np.float64).eps * 10 self.y_err[self.y_err<self.eps] = self.eps self.last_arg = None self.func_code = FakeFuncCode(f, dock=True) self.ndof = len(self.y) - (self.func_code.co_argcount - 1) def __call__(self, *arg): self.last_arg = arg loss = ((self.f(self.x, *arg) - self.y)/(self.y_err))**2 return np.sum(loss) class HuberRegression: def __init__(self, f, x, y, y_err, delta=1.35): self.f = f self.x = x self.y = y self.y_err = y_err self.delta = delta self.eps = np.finfo(np.float64).eps * 10 self.y_err[self.y_err<self.eps] = self.eps self.last_arg = None self.func_code = FakeFuncCode(f, dock=True) self.ndof = len(self.y) - (self.func_code.co_argcount - 1) def __call__(self, *arg): self.last_arg = arg a = abs((self.y - self.f(self.x, *arg))/self.y_err) cond_flag = (a>self.delta) loss = np.sum((~cond_flag) * (0.5 * a ** 2) - (cond_flag) * self.delta * (0.5 * self.delta - a), -1) return np.sum(loss) @njit def _SelectRangeNumba(array, low_lim, hi_lim): index = (array >= low_lim) & (array <= hi_lim) return array[index] class PeakOTron: def __init__(self, verbose=False, n_call_minuit=1000, n_iterations_minuit = 10 ): self.Init() self.greek_letters = ["Alpha", "alpha", "Beta", "beta", "Gamma", "gamma", "Delta", "delta", "Epsilon", "epsilon", "Zeta", "zeta", "Eta", "eta", "Theta", "theta", "Iota", "iota", "Kappa", "kappa", "Lbda", "lbda", "Mu", "mu", "Nu", "nu", "Xi", "xi", "Omicron", "omicron", "Pi", "pi", "Rho", "rho", "Sigma", "sigma", "Tau", "tau", "Upsilon", "upsilon", "Phi", "phi", "Chi", "chi", "Psi", "psi", "Omega", "omega"] self._eps = np.finfo(np.float64).eps * 10 self._eps_kde = 1e-6 self._FWHM2Sigma = 1/(2*np.sqrt(2*np.log(2))) self._plot_figsize= (10,10) self._plot_fontsize= 20 self._cmap = cm.get_cmap('viridis') self._max_n_peaks = 12 self._max_n_dcr_peaks = 6 self._len_DCR_pad = int(100) self._verbose=verbose self._n_call_minuit = n_call_minuit self._n_iterations_minuit = n_iterations_minuit self._is_histogram = False self._default_fit_kwargs={ "tau":None, "t_0":None, "r_fast":None, "tgate":None, "pedestal":None, "n_bootstrap": 1000, "n_bootstrap_kde":100, "bw":None, "peak_dist_factor":0.8, "peak_width_factor":0.5, "prominence": 0.0, "min_n_peak_evts": 100, "kernel_kde":"gaussian", "bandwidth_kde":"ISJ", "ppf_mainfit":1 - 1e-6, "bin_method":"knuth" } def LevDist(self, str1, str2): counter = {"+": 0, "-": 0} distance = 0 for edit_code, *_ in ndiff(str1, str2): if edit_code == " ": distance += max(counter.values()) counter = {"+": 0, "-": 0} else: counter[edit_code] += 1 distance += max(counter.values()) return distance ###HELPER FUNCTIONS def Init(self): self._peak_data = {} self._GD_data = {} self._DCR_data={} self._fit_values= {} self._fit_errors = {} self._fit = None self._failed = False def LatexFormat(self, f, scirange=[0.01,1000]): if(np.abs(f)>scirange[0] and np.abs(f)<scirange[1]): float_str = r"${:3.3f}$".format(f) else: try: float_str = "{:3.3E}".format(f) base, exponent = float_str.split("E") float_str = r"${0} \times 10^{{{1}}}$".format(base, int(exponent)) except: float_str=str(f) return float_str def SelectRange(self, array, low_lim, hi_lim): return _SelectRangeNumba(array, low_lim, hi_lim) def Logify(self, y): log_y = np.log(y) min_log_y = np.min(log_y[y>0]) return np.where(y>0, log_y, min_log_y) def SetMaxNPeaks(self, max_n_peaks): max_n_peaks = int(max_n_peaks) if(max_n_peaks<1): raise Exception("Maximum number of peaks must be greater than 2.") self._max_n_peaks = max_n_peaks print("Set maximum number of peaks to {:d}.".format(self._max_n_peaks)) def SetMaxNDCRPeaks(self, max_n_dcr_peaks): max_n = int( max_n_dcr_peaks) if(max_n_dcr_peaks<2): raise Exception("Maximum number of peaks must be greater than 2.") self._max_n_dcr_peaks = max_n_dcr_peaks print("Set maximum number of dark peaks to {:d}.".format(self._max_n_dcr_peaks)) def DummyLinear(self, x, m, c): return m*x + c def DummyLogGPois(self, k, mu, lbda, A): return A + gpoisson.logpmf(k, mu, lbda) def SigmaK(self, k, sigma_0, sigma_1): return np.sqrt(sigma_0**2 + k*sigma_1**2) def PoisPPF(self, q, mu): vals = np.ceil(sc.pdtrik(q, mu)) vals1 = np.maximum(vals - 1, 0) temp = sc.pdtr(vals1, mu) return np.where(temp >= q, vals1, vals) def GPoisPPF(self, q, mu, lbda, k_max): _sum = 0 _count = 0 while _count<=k_max: _sum += gpoisson.pmf(_count, mu, lbda) if(_sum<q): _count+=1 else: break return _count def EstLbda(self, data_sub_x_0): _skew = skew(data_sub_x_0) _mean = np.mean(data_sub_x_0) _std = np.std(data_sub_x_0) _lbda = 0.5*(((_mean*_skew)/(_std))- 1) return _lbda def EstGain(self, data_sub_x_0): _skew = skew(data_sub_x_0) _mean = np.mean(data_sub_x_0) _std = np.std(data_sub_x_0) _lbda = 0.5*(((_mean*_skew)/(_std))- 1) _gain = (_std**2/_mean)*((1 - _lbda)**2) return _gain def EstMu(self, data_sub_x_0): _skew = skew(data_sub_x_0) _mean = np.mean(data_sub_x_0) _std = np.std(data_sub_x_0) _lbda = 0.5*(((_mean*_skew)/(_std))- 1) _mu = (1/(1-_lbda))*(_mean**2/_std**2) return _mu def Bootstrap(self, data, statistic, alpha=0.95, n_samples=10000): if not (0 < alpha < 1): raise ValueError("confidence level must be in (0, 1)") n = n_samples if n <= 0: raise ValueError("data must contain at least one measurement.") boot_stat = bootstrap(data, n_samples, bootfunc=statistic) stat_data = statistic(data) mean_stat = np.mean(boot_stat) est_stat = 2*stat_data - mean_stat std_err = np.std(boot_stat) z_score = np.sqrt(2.0)*sc.erfinv(alpha) conf_interval = est_stat + z_score*np.array((-std_err, std_err)) return est_stat, std_err, conf_interval def BootstrapKDE(self, data, kernel = "gaussian", bw="optimise", n_kde_samples=2**13, alpha=0.95, n_samples=1000, limits=None ): if not (0 < alpha < 1): raise ValueError("Bootstrap confidence level, alpha, must be in (0, 1).") n = n_samples if n <= 0: raise ValueError("Bootstrap data must contain at least one measurement.") kernels = list(FFTKDE._available_kernels.keys()) if kernel not in kernels: raise ValueError("Selected KDE kernel not available. Please choose from these options: {:s}.".format(", ".join(kernels))) if(not isinstance(bw, float)): bws = list(FFTKDE._bw_methods.keys()) + ["optimise"] if bw not in bws: raise ValueError("Selected KDE bandwidth estimation method not available. Please choose from these options: {:s}.".format(", ".join(bws))) if(bw=="optimise"): try: BWO= BandWidthOptimiser(data, kernel=kernel) bw_scott = scott_bin_width(data) minuit_kde = Minuit(BWO, bw=bw_scott/2) minuit_kde.limits["bw"]=(self._eps_kde, 3*bw_scott) minuit_kde.strategy=2 minuit_kde.migrad(ncall= self._n_call_minuit, iterate =self._n_iterations_minuit) if(minuit_kde.valid): bw = minuit_kde.values["bw"] if(self._verbose): print("KDE Bandwidth optimised to: {0}".format(bw)) else: if(self._verbose): print("KDE Bandwidth optimisation failed. Defaulting to Improved Sheather Jones.") bw = "ISJ" except: if(self._verbose): print("KDE Bandwidth optimisation failed. Defaulting to Improved Sheather Jones.") bw = "ISJ" kde = FFTKDE(kernel = kernel, bw=bw).fit(data) x_kde, y_kde_orig = kde.evaluate(n_kde_samples) boot_data = bootstrap(data, n_samples) y_kde_bs = np.vstack([FFTKDE(kernel = kernel, bw=bw).fit(_data).evaluate(x_kde) for _data in boot_data]) y_kde_mean = np.mean(y_kde_bs, axis=0) y_kde = 2*y_kde_orig - y_kde_mean y_kde_err = np.std(y_kde_bs, axis=0) z_score = np.sqrt(2.0)*sc.erfinv(alpha) y_kde_conf_interval = y_kde + z_score*np.array((-y_kde_err, y_kde_err)) if(limits is not None): cond_inrange =(x_kde>np.min(limits)) & (x_kde<np.max(limits)) x_kde = x_kde[cond_inrange] y_kde = y_kde[cond_inrange] y_kde_err = y_kde_err[cond_inrange] return x_kde, y_kde, y_kde_err, y_kde_conf_interval ##MODEL DEFINITIONS def DRM_basic(self, x, mu, x_0, G, sigma_0, sigma_1, alpha, r_beta, lbda, k_low, k_hi): k = np.arange(k_low, k_hi) k = sorted(list(k[k!=0])) k_is = [k[0:int(_k)] for _k in k] f0 = gpoisson.pmf(0, mu, lbda)*norm.pdf(x, x_0, sigma_0) f1s = np.asarray([ binom.pmf(0, _k, alpha)*gpoisson.pmf(_k, mu, lbda)*norm.pdf(x, x_0 + _k*G, self.SigmaK(_k, sigma_0, sigma_1)) for _k in k ]) f2s = np.vstack([ np.asarray([ binom.pmf(_i, _k, alpha)*gpoisson.pmf(_k, mu, lbda)*self.XTM(x, _k, _i, x_0, G, sigma_0, sigma_1, r_beta) for _i in _is ]) for _k, _is in zip(k, k_is) ]) f = np.sum(np.vstack([f0, f1s, f2s]), axis=0) return f def XTM(self, x, k, i, x_0, G, sigma_0, sigma_1, r_beta): x_k = x - (x_0 + k*G) _sigma_k = self.SigmaK(k, sigma_0, sigma_1) x_ct_pad = np.zeros_like(x_k) if(i==1): cond = (x_k > -5*_sigma_k) error_f = 0.5*(1.0 + sc.erf(x_k/(_sigma_k*np.sqrt(2.0)))) log_f = -x_k/(r_beta*G) - np.log(r_beta*G) x_ct = np.where(cond, np.exp(log_f)*error_f, x_ct_pad) elif(i>1): cond = (x_k > 0) log_f = sc.xlogy(i-1, x_k) - sc.gammaln(i) - sc.xlogy(i, r_beta*G) - x_k/(r_beta*G) x_ct = np.where(cond, np.exp(log_f), x_ct_pad) else: x_ct = x_k return x_ct def PH_range(self, t, t_0, r_fast, tau, tgate): if((t>t_0) and (t<0)): PH_min = (1-r_fast)*np.exp(t_0/tau)*(1 - np.exp(-tgate/tau)) PH_max = (1-r_fast)*(1 - np.exp(-tgate/tau)) elif((t>0) and (t<tgate)): PH_min = r_fast PH_max = (1 - (1-r_fast)*np.exp(-tgate/tau)) else: PH_min = 0 PH_max = 0 return PH_min, PH_max def dpdPH(self, x, x_0, G, tau, tgate, t_0, r_fast): PH_bfg_low, PH_bfg_hi = self.PH_range(-abs(t_0)/2, -abs(t_0), r_fast, tau, tgate) PH_dg_low, PH_dg_hi = self.PH_range(tgate/2, -abs(t_0), r_fast, tau, tgate) x_norm = (x-x_0)/(G) PHs = np.zeros_like(x_norm) cond_bfg = (x_norm>PH_bfg_low) & (x_norm<=PH_bfg_hi) cond_dg = (x_norm>PH_dg_low) & (x_norm<=PH_dg_hi) PHs[cond_bfg] += 1/x_norm[cond_bfg] PHs[cond_dg] += 1/(1 - x_norm[cond_dg]) return PHs def ApplyDCR(self, x, y_light, x_0, G, DCR, tgate, t_0, tau, r_fast, lbda, dx, k_dcr_low, k_dcr_hi): mu_dcr = DCR*(abs(t_0) + tgate) x_lin = np.arange(0, len(x)) x_pad_lin = np.arange(-self._len_DCR_pad, len(x)+self._len_DCR_pad) f_lin = interp1d(x, x_lin, fill_value='extrapolate') G_lin = f_lin(x_0+G) - f_lin(x_0) x_0_lin = f_lin(x_0) idx_orig = np.where(np.in1d(x_pad_lin, np.intersect1d( x_pad_lin, x_lin ) ), True, False) idx_x_0_lin = np.argmin(abs(x_pad_lin - x_0_lin)) n_dcr = np.arange(k_dcr_low, max(2, k_dcr_hi) ) fs = [] pfs = [] hs = [] phs = [] for _n_dcr in n_dcr: if(_n_dcr==0): f = np.zeros(len(x_pad_lin)) f[idx_x_0_lin] = 1 pf = poisson.pmf(0, mu_dcr) h = np.zeros(len(x_pad_lin)) ph = 0 else: if(len(fs)<=1): f = self.dpdPH(x_pad_lin, x_0_lin, G_lin, tau, tgate, t_0, r_fast) else: f = convolve(fs[1], fs[-1])[idx_x_0_lin:len(x_pad_lin)+idx_x_0_lin] f = f/ np.trapz(f, dx = 1) pf = poisson.pmf(_n_dcr, mu_dcr)*(borel.pmf(0, lbda)**_n_dcr) h = self.dpdPH(x_pad_lin, x_0_lin, G_lin*(_n_dcr+1), tau, tgate, t_0, r_fast) h = h/ np.trapz(h, dx = 1) ph = poisson.pmf(1, mu_dcr)*borel.pmf(_n_dcr, lbda)/(_n_dcr+1) fs.append(f) pfs.append(pf) hs.append(h) phs.append(ph) fs = np.asarray(fs) hs = np.asarray(hs) pfs = np.expand_dims(np.asarray(pfs), -1) phs = np.expand_dims(np.asarray(phs), -1) y_dark = np.sum(fs*pfs, axis=0) + np.sum(hs*phs, axis=0) y_dark = y_dark/np.trapz(h, dx = 1) y_model = convolve(y_dark, np.pad(y_light, (self._len_DCR_pad, self._len_DCR_pad), "constant", constant_values=(0.0,0.0)), )[idx_x_0_lin:len(x_pad_lin)+idx_x_0_lin] y_model = y_model/np.trapz(y_model, dx = 1)/dx y_model = y_model[idx_orig] return y_model def DRM_nodcr( self, x, mu, x_0, G, sigma_0, sigma_1, alpha, r_beta, lbda, k_low, k_hi): y_model = self.DRM_basic(x, mu, x_0, G, sigma_0, sigma_1, alpha, r_beta, lbda, k_low, k_hi, dx ) return y_model def DRM( self, x, mu, x_0, G, sigma_0, sigma_1, alpha, r_beta, lbda, k_low, k_hi, k_dcr_low, k_dcr_hi, DCR, tau, tgate, t_0, r_fast, dx ): y_light = self.DRM_basic(x, mu, x_0, G, sigma_0, sigma_1, alpha, r_beta, lbda, k_low, k_hi ) y_model = self.ApplyDCR(x, y_light, x_0, G, DCR, tgate, t_0, tau, r_fast, lbda, dx, k_dcr_low, k_dcr_hi) return y_model ###PLOTTING def PlotOriginalHistogram(self, ax): ax.plot(self._GD_data["x_s"], self._GD_data["y_s"], lw=5, label="Data") ax.set_title("Normalised \n Input \n Distribution",fontsize=self._plot_fontsize) ax.set_xlabel("IQR Normalised Charge Units [Q.U.]", fontsize=self._plot_fontsize) ax.tick_params(axis="x", labelsize=self._plot_fontsize) ax.tick_params(axis="y", labelsize=self._plot_fontsize) ax.set_yscale("log") ax.legend(fontsize=max(10, self._plot_fontsize//1.5)) def PlotKDE(self, ax): ax.plot(self._GD_data["x_s"], self._GD_data["y_s"], label="Data", lw=5 ) ax.plot(self._GD_data["x_kde_s"], self._GD_data["y_kde_s"], lw=2.5, label="KDE") ax.set_title("Kernel Density Estimate", fontsize=self._plot_fontsize) ax.set_xlabel("IQR Normalised Charge Units [Q.U.]", fontsize=self._plot_fontsize) ax.tick_params(axis="x", labelsize=self._plot_fontsize) ax.tick_params(axis="y", labelsize=self._plot_fontsize) ax.set_yscale("log") ax.set_ylim([1e-5, None]) ax.legend(fontsize=max(10, self._plot_fontsize//1.5)) def PlotPeaks(self, ax): ax.plot(self._GD_data["x_s"], self._GD_data["y_s"], label="Input Pulse \n Height \n Spectrum", lw=5 ) color_vals = [self._cmap(_v) for _v in np.linspace(0,0.75, len(self._peak_data["x_peak_s"]))] for _i_x, (_x, _x_w, _c_v) in enumerate(zip( self._peak_data["x_peak_s"], self._peak_data["x_width_s"], color_vals )): _x_peak =_x _x_width_low = _x - _x_w/2 _x_width_hi = _x + _x_w/2 _y_peak = self._GD_data["y_s"][np.argmin(abs(self._GD_data["x_s"]-_x_peak))] _y_width_low = self._GD_data["y_s"][np.argmin(abs(self._GD_data["x_s"]-_x_width_low))] _y_width_hi = self._GD_data["y_s"][np.argmin(abs(self._GD_data["x_s"]-_x_width_hi))] if(_i_x == 0): annotation_text = "Pedestal" color = "red" else: annotation_text = "Peak {:d}".format(_i_x) color = _c_v ax.vlines(x=[_x], ymin=0, ymax=_y_peak, color=color, linestyle="-", lw=2.5, label = '\n{:s}: \n $x_{{pos}}$ = {:3.3f} Q.U \n $\Delta x_{{pos}}$ {:3.3f} Q.U \n'.format( annotation_text, _x_peak, _x_width_hi - _x_width_low) ) ax.vlines(x=[_x_width_low], ymin=0, ymax=_y_width_low, color=color, lw=2.5, linestyle="--") ax.vlines(x=[_x_width_hi], ymin=0, ymax=_y_width_hi, color=color, lw=2.5, linestyle="--") ax.axvline(x=[self._GD_data["limit_s"]], color = "purple", lw=5, label="Pedestal Threshold ($PH_{{0}} - \\frac{G_{est}}{2}$)") box = ax.get_position() ax.set_position([box.x0, box.y0, box.width * 0.8, box.height]) # Put a legend to the right of the current axis ax.legend(loc='center left', bbox_to_anchor=(1, 0.5), fontsize=max(10, self._plot_fontsize//2), ncol=len(self._peak_data["x_peak_s"])//6 + 1 ) ax.set_title("Peak Finding \n # Peaks = {:d}".format(len(self._peak_data["x_peak_s"])) , fontsize=self._plot_fontsize) ax.set_xlabel("IQR Normalised Charge Units [Q.U.]", fontsize=self._plot_fontsize) ax.tick_params(axis="x", labelsize=self._plot_fontsize) ax.tick_params(axis="y", labelsize=self._plot_fontsize) ax.set_yscale("log") def PlotGenPois(self, ax): ax.scatter(self._peak_data["n_peak_s"], self.Logify(self._peak_data["y_peak_s"]), label="Extracted Peak Heights" ) y_lgp = self.DummyLogGPois(self._peak_data["n_peak_s"], self._GD_data["fit_GP_mu"], self._GD_data["fit_GP_lbda"], self._GD_data["fit_GP_A"] ) ax.plot(self._peak_data["n_peak_s"], y_lgp, linestyle="--", lw=2.5, color="red", label="Fit") ax.set_title( "Generalised Poisson Fit \n $\\mu$ = {0} $\pm$ {1} \n $\\lambda$ = {2} $\pm$ {3} \n $A$ = {4} $\pm$ {5}".format( self.LatexFormat(self._GD_data["fit_GP_mu"]), self.LatexFormat(self._GD_data["fit_GP_mu_err"]), self.LatexFormat(self._GD_data["fit_GP_lbda"]), self.LatexFormat(self._GD_data["fit_GP_lbda_err"]), self.LatexFormat(self._GD_data["fit_GP_A"]), self.LatexFormat(self._GD_data["fit_GP_A_err"]) ), fontsize=self._plot_fontsize) ax.set_xlabel("Peak", fontsize=self._plot_fontsize) ax.set_ylabel("$\\log{\\frac{dP}{dPH}}$", fontsize=self._plot_fontsize) ax.tick_params(axis="x", labelsize=self._plot_fontsize) ax.tick_params(axis="y", labelsize=self._plot_fontsize) ax.legend(fontsize=max(10, self._plot_fontsize//1.5)) def PlotSigmaEst(self, ax): ax.set_title("Estimated Peak Variance \n $\\sigma^{{2}}_{{0}}$={0} $\pm$ {1} \n $\\sigma^{{2}}_{{1}}$={2} $\pm$ {3}".format( self.LatexFormat(self._GD_data["fit_var0"]), self.LatexFormat(self._GD_data["fit_var0_err"]), self.LatexFormat(self._GD_data["fit_var1"]), self.LatexFormat(self._GD_data["fit_var1_err"]), ), fontsize=self._plot_fontsize) ax.errorbar(self._peak_data["n_peak_s"], self._peak_data["peak_var_s"], yerr=self._peak_data["peak_var_err_s"], label="Extracted Variances", fmt="o") ax.plot(self._peak_data["n_peak_s"], self.DummyLinear(self._peak_data["n_peak_s"], self._GD_data["fit_var1"], self._GD_data["fit_var0"] ), linestyle="--", lw=2.5, color="red", label="Fit") ax.set_xlabel("Peak", fontsize=self._plot_fontsize) ax.set_ylabel("$\sigma^{2}_{peak}$", fontsize=self._plot_fontsize) ax.tick_params(axis="x", labelsize=self._plot_fontsize) ax.tick_params(axis="y", labelsize=self._plot_fontsize) ax.legend(fontsize=max(10, self._plot_fontsize//1.5)) def PlotGainEst(self, ax): ax.set_title("Estimated Gain \n $G$={0} $\pm$ {1} \n $PH_{{0}}$={2} $\pm$ {3}".format( self.LatexFormat(self._GD_data["fit_G"]), self.LatexFormat(self._GD_data["fit_G_err"]), self.LatexFormat(self._GD_data["fit_x_0"]), self.LatexFormat(self._GD_data["fit_x_0_err"]), ), fontsize=self._plot_fontsize) ax.errorbar(self._peak_data["n_peak_s"], self._peak_data["peak_mean_s"], yerr=self._peak_data["peak_mean_err_s"], label="Extracted Peak Means", fmt="o") ax.plot(self._peak_data["n_peak_s"], self.DummyLinear(self._peak_data["n_peak_s"], self._GD_data["fit_G"], self._GD_data["fit_x_0"] ), linestyle="--", lw=2.5, color="red", label="Fit") ax.set_xlabel("Peak", fontsize=self._plot_fontsize) ax.set_ylabel("$<Peak>$", fontsize=self._plot_fontsize) ax.tick_params(axis="x", labelsize=self._plot_fontsize) ax.tick_params(axis="y", labelsize=self._plot_fontsize) ax.legend(fontsize=max(10, self._plot_fontsize//1.5)) def PlotDCREst(self, ax): x_dc_kde = self._DCR_data["x_dc_kde"] y_dc_kde = self._DCR_data["y_dc_kde"] y_dc_kde_err = self._DCR_data["y_dc_kde_err"] cond_p_0toLM = self._DCR_data["cond_p_0toLM"] cond_p_LMto2LM = self._DCR_data["cond_p_LMtoL2M"] cond_interpeak = self._DCR_data["cond_interpeak"] int_p_0toLM = self._DCR_data["int_p_0toLM"] dint_p_0toLM = self._DCR_data["dint_p_0toLM"] int_p_LMto2LM = self._DCR_data["int_p_LMto2LM"] dint_p_LMto2LM = self._DCR_data["dint_p_LMto2LM"] mean_p_interpeak = self._DCR_data["mean_p_interpeak"] dmean_p_interpeak = self._DCR_data["dmean_p_interpeak"] ax.fill_between( x_dc_kde[cond_p_0toLM], y_dc_kde[cond_p_0toLM], color='none', hatch="///", edgecolor="b", label = r"$I_{{0 \rightarrow x_{{min}}}}$ = {:s} $\pm$ {:s}".format(self.LatexFormat(int_p_0toLM), self.LatexFormat(dint_p_0toLM) ) ) ax.fill_between( x_dc_kde[cond_p_LMto2LM], y_dc_kde[cond_p_LMto2LM], color='none', hatch="///", edgecolor="r", label = r"$I_{{x_{{min}} \rightarrow 2x_{{min}}}}$ = {:s} $\pm$ {:s}".format(self.LatexFormat(int_p_LMto2LM), self.LatexFormat(dint_p_LMto2LM) ) ) ax.fill_between( x_dc_kde[cond_interpeak], y_dc_kde[cond_interpeak], color='none', hatch="xxx", edgecolor="g", label = r"$<I_{{x_{{min}} - x_{{lim}} \rightarrow x_{{min}} + x_{{lim}} }}>$ = {:s} $\pm$ {:s}".format( self.LatexFormat(mean_p_interpeak), self.LatexFormat(dmean_p_interpeak) ) ) ax.plot(x_dc_kde, y_dc_kde, linestyle="-", color="purple", label="KDE, 95% Confidence" ) ax.fill_between(x_dc_kde, y1=y_dc_kde - 1.96*y_dc_kde_err, y2=y_dc_kde + 1.96*y_dc_kde_err, alpha=0.2, color="purple", label="KDE, 95% Confidence" ) ax.set_title("Dark Count Rate \n DCR = {0} $\pm$ {1}".format( self.LatexFormat(self._DCR_data["fit_DCR"]), self.LatexFormat(self._DCR_data["fit_DCR_err"])), fontsize=self._plot_fontsize) ax.set_xlabel("$PH_{norm, est}$", fontsize=self._plot_fontsize) ax.set_ylabel("$\\frac{dp_{DCR}}{PH_{norm, est}}$", fontsize=self._plot_fontsize) ax.tick_params(axis="x", labelsize=self._plot_fontsize) ax.tick_params(axis="y", labelsize=self._plot_fontsize) ax.legend(fontsize=max(10, self._plot_fontsize//1.5)) ax.set_yscale("log") def PlotFit(self, figsize=(10.5,10.5), labelsize=None, ticksize=None, titlesize=None, legendsize=None, title=None, xlabel = None, scaled=False, save_directory=None, y_limits = [None, None], residual_limits = [-5, 5], linewidth_main = 5, x_limits = [None, None], display=True ): if(labelsize is None): labelsize=self._plot_fontsize if(ticksize is None): ticksize=self._plot_fontsize if(titlesize is None): titlesize = self._plot_fontsize if(legendsize is None): legendsize = max(10, self._plot_fontsize//1.5) if(scaled): x = self._GD_data["x_s"] y = self._GD_data["y_s"] y_err = self._GD_data["y_err_s"] y_hat = self.GetModelIQR(self._GD_data["x_s"]) if(xlabel is None): xlabel = "IQR Normalised Charge Units [Q.U.]" else: x = self._GD_data["x"] y = self._GD_data["y"] y_err = self._GD_data["y_err"] y_hat = self.GetModel(self._GD_data["x"]) if(xlabel is None): xlabel = "Input Charge Units [Q.U.]" fig = plt.figure(figsize=figsize) gs = gridspec.GridSpec(2, 1, height_ratios=[2, 1]) ax0 = plt.subplot(gs[0]) ax0.plot(x, y, label="Data", lw=5, color="C0") ax0.plot(x, y_hat, linestyle="--", label="Model", lw=5, color="C1") ax0.set_yscale("log") ax0.set_xlabel("$\\frac{dp}{dPH}$", fontsize=labelsize) ax0.tick_params(axis="x", labelsize=ticksize) ax0.tick_params(axis="y", labelsize=ticksize) ax0.set_ylim(y_limits) ax0.set_xlim(x_limits) if(title is not None): ax0.set_title(title, fontsize=titlesize) ax0.legend(fontsize=legendsize) ax1 = plt.subplot(gs[1], sharex = ax0) ax1.scatter(x, (y - y_hat)/(y_err), color="C1") ax1.tick_params(axis="x", labelsize=ticksize) ax1.tick_params(axis="y", labelsize=ticksize) ax1.set_xlabel(xlabel, fontsize=labelsize) ax1.set_ylim(residual_limits) ax1.axhline(y=-1.0, color="purple", lw=2.5, linestyle="--") ax1.axhline(y=1.0, color="purple", lw=2.5, linestyle="--") ax1.tick_params(axis="x", labelsize=self._plot_fontsize) ax1.tick_params(axis="y", labelsize=self._plot_fontsize) fig.tight_layout() fig.subplots_adjust(hspace=.0) plt.pause(0.01) if(save_directory is not None): print("Saving figure to {0}...".format(save_directory)) fig.savefig(save_directory) if(display): fig.show() def PlotSummary(self, save_directory=None): fig = plt.figure(figsize=(20,40)) gs = gridspec.GridSpec(4, 2) gs.update(wspace=0.5, hspace=0.5) ax0 = fig.add_subplot(gs[0,0]) self.PlotOriginalHistogram(ax0) ax1 = fig.add_subplot(gs[0,1]) self.PlotKDE(ax1) ax2 = fig.add_subplot(gs[1,:]) self.PlotPeaks(ax2) ax3 = fig.add_subplot(gs[2,0]) self.PlotGenPois(ax3) ax4 = fig.add_subplot(gs[2,1]) self.PlotSigmaEst(ax4) ax5 = fig.add_subplot(gs[3,0]) self.PlotGainEst(ax5) ax6 = fig.add_subplot(gs[3,1]) self.PlotDCREst(ax6) if(save_directory is not None): print("Saving figure to {0}...".format(save_directory)) fig.savefig(save_directory) if(display): fig.show() def GetModel(self, x): return self.DRM(x, **self._fit_values) def GetModelIQR(self, x): return self.DRM(x, **self._fit_values_s) def GetOriginalDistribution(self): return _self._GD_data["x"], self._GD_data["y"] def GetFitResults(self): return self._fit_values, self._fit_errors def Failed(self): return self._failed ## PREFIT def GetHistAndPeaks(self, data, bw, peak_dist_factor, peak_width_factor, pedestal, prominence, bin_method, n_bootstrap, n_bootstrap_kde, bandwidth_kde, kernel_kde, min_n_peak_evts): self.scaler = RobustScaler() data_s = np.squeeze(self.scaler.fit_transform(data.reshape(-1,1))) pedestal_s = (pedestal - self.scaler.center_[0])/self.scaler.scale_[0] if(bw is None): if(bin_method == "knuth"): bw_s, bins_s = knuth_bin_width(data_s, return_bins=True) elif(bin_method=="freedman"): bw_s, bins_s = freedman_bin_width(data_s, return_bins=True) elif(bin_method=="scott"): bw_s, bins_s = scott_bin_width(data_s, return_bins=True) else: raise Exception ("Please specify a bin width by setting 'bw' or choose a method from 'knuth', 'freedman' or 'scott'") nbins_s = len(bins_s) bw = bw_s*self.scaler.scale_[0] nbins = np.ceil((data.max() - data.min()) / bw) nbins = max(1, nbins) bins = data.min() + bw * np.arange(nbins + 1) else: bw_s = bw/self.scaler.scale_[0] nbins = np.ceil((data.max() - data.min()) / bw) nbins = max(1, nbins) bins = data.min() + bw * np.arange(nbins + 1) nbins_s = np.ceil((data_s.max() - data_s.min()) / bw_s) nbins_s = max(1, nbins_s) bins_s = data_s.min() + bw_s * np.arange(nbins_s + 1) #Calculate histogram information y_N, x = np.histogram(data, bins=bins, density=False) N = np.sum(y_N) y = y_N/N/bw y_err = y*np.sqrt(1/y_N + 1/N) x = (x[:-1] + x[1:])/2. y_N_s, x_s = np.histogram(data_s, bins=bins_s, density=False) N_s = np.sum(y_N_s) y_s = y_N_s/N_s/bw_s y_err_s = y_s*np.sqrt(1/y_N_s + 1/N_s) x_s = (x_s[:-1] + x_s[1:])/2. try: x_kde_s, y_kde_s, y_kde_err_s, _, = self.BootstrapKDE(data_s, kernel=kernel_kde, bw=bandwidth_kde, n_samples=n_bootstrap_kde) f_lin_kde_s = interp1d(x_kde_s, np.arange(len(x_kde_s)), fill_value='extrapolate') except: print("Initial KDE failed. Setting fit fail status and continuing...") self._failed = True return est_mu, std_err_mu, _ = self.Bootstrap(data_s-pedestal_s, self.EstMu, n_samples=n_bootstrap) est_lbda, std_err_lbda, _ = self.Bootstrap(data_s - pedestal_s, self.EstLbda, n_samples=n_bootstrap) est_gain, std_err_gain, _ = self.Bootstrap(data_s - pedestal_s, self.EstGain, n_samples=n_bootstrap) est_gain_lin = abs(f_lin_kde_s(est_gain) - f_lin_kde_s(0)) peaks, properties = find_peaks(y_kde_s, distance=peak_dist_factor*est_gain_lin, prominence=prominence ) if(len(peaks)<3): if(self._verbose): print("Not enough peaks found with distance criterion to fit. Removing distance criterion and finding all peaks...") peaks, properties = find_peaks(y_kde_s, prominence=prominence ) limit_s = pedestal_s - 0.5*est_gain limit = pedestal - 0.5*est_gain*self.scaler.scale_[0] cond_limit_s = (x_kde_s[peaks]>limit_s) if(sum(cond_limit_s)<3): if(self._verbose): print("No peaks observed above pedestal - gain/2. Continuing without thresholding...") else: peaks = peaks[cond_limit_s] x_peaks_s = x_kde_s[peaks] y_peaks_s = y_kde_s[peaks] y_peaks_err_s = y_kde_err_s[peaks] temp_widths = np.asarray(peak_widths(y_kde_s, peaks, rel_height=peak_width_factor)) temp_widths[2:] = x_kde_s[temp_widths[2:].astype(int)] x_widths_s = temp_widths[3] - temp_widths[2] self._peak_data["x_peak_s"] = [] self._peak_data["y_peak_s"] = [] self._peak_data["y_peak_err_s"] = [] self._peak_data["n_peak_s"] = [] self._peak_data["x_width_s"] = [] n_valid_peaks = 0 for x_i, (y_peak, y_peak_err, x_peak, x_width) in enumerate(zip(y_peaks_s, y_peaks_err_s, x_peaks_s, x_widths_s)): N_evt_peak = len(self.SelectRange(data_s, x_peak-x_width/2, x_peak+x_width/2)) if(N_evt_peak<min_n_peak_evts): if(self._verbose): print("Peak {:d} has {:d} events. Less than event threshold {:d}. Ignoring...".format(x_i, N_evt_peak, min_n_peak_evts)) else: if(self._verbose): print("Peak {:d} has {:d} events. Adding...".format(x_i, N_evt_peak, min_n_peak_evts)) self._peak_data["x_peak_s"].append(x_peak) self._peak_data["y_peak_s"].append(y_peak) self._peak_data["y_peak_err_s"].append(y_peak_err) self._peak_data["n_peak_s"].append(x_i) self._peak_data["x_width_s"].append(x_width) n_valid_peaks+=1 self._peak_data["x_peak_s"] = np.asarray(self._peak_data["x_peak_s"]) self._peak_data["y_peak_s"] = np.asarray(self._peak_data["y_peak_s"]) self._peak_data["y_peak_err_s"] = np.asarray(self._peak_data["y_peak_err_s"]) self._peak_data["n_peak_s"] = np.asarray(self._peak_data["n_peak_s"]) self._peak_data["x_width_s"] = np.asarray(self._peak_data["x_width_s"]) if(n_valid_peaks<3): print("Minimum 3 peaks covering a threshold {0} events required. Too few events in peaks to proceed. Setting fit fail status and continuing...".format(min_n_peak_evts)) self._failed = True return elif(self._peak_data["n_peak_s"][0]>0): if(self._verbose): print("Estimated Pedestal had fewer than threshold {0} events. Shifting the pedestal forward...") self._peak_data["n_peak_s"] = self._peak_data["n_peak_s"] - self._peak_data["n_peak_s"][0] self._GD_data["x"] = x self._GD_data["y"] = y self._GD_data["y_err"] = y_err self._GD_data["bw"] = bw self._GD_data["N"] = N self._GD_data["nbins"] = nbins self._GD_data["x_s"] = x_s self._GD_data["y_s"] = y_s self._GD_data["y_err_s"] = y_err_s self._GD_data["x_kde_s"] = x_kde_s self._GD_data["y_kde_s"] = y_kde_s self._GD_data["y_kde_err_s"] = y_kde_err_s self._GD_data["bw_s"] = bw_s self._GD_data["N_s"] = N_s self._GD_data["nbins_s"] = nbins_s self._GD_data["fit_GP_mu"] = max(est_mu, self._eps) self._GD_data["fit_GP_mu_err"] = std_err_mu self._GD_data["fit_GP_lbda"] = max(est_lbda, self._eps) self._GD_data["fit_GP_lbda_err"] = std_err_lbda self._GD_data["fit_G"] = est_gain self._GD_data["fit_G_err"] = std_err_gain self._GD_data["fit_x_0"] = self._peak_data["x_peak_s"][0] self._GD_data["fit_x_0_err"] = self._peak_data["x_width_s"][0] self._GD_data["scaler"] = self.scaler self._GD_data["limit_s"] = limit_s def PreFitGenPoisToPeaks(self, ppf_mainfit, n_bootstrap): ######## #STEP 1 ######## fit_lbda = self._GD_data["fit_GP_lbda"] fit_dlbda = self._GD_data["fit_GP_lbda_err"] fit_x_0 = self._GD_data["fit_x_0"] fit_mu = self._GD_data["fit_GP_mu"] fit_dmu = self._GD_data["fit_GP_mu_err"] x_peak = self._peak_data["x_peak_s"] n_peak = self._peak_data["n_peak_s"] y_peak = self._peak_data["y_peak_s"] y_peak_err = self._peak_data["y_peak_err_s"] log_y_peak = self.Logify(y_peak) dlog_y_peak = y_peak_err/y_peak fit_A_estspec = gpoisson.logpmf(n_peak, fit_mu, fit_lbda) fit_A = np.nanmean(log_y_peak - fit_A_estspec) fit_dA = np.nanmean(2*dlog_y_peak) fit_dict_pois = { "A": fit_A, "mu":fit_mu, "lbda":fit_lbda, } fit_pois = Chi2Regression( self.DummyLogGPois, n_peak, log_y_peak, dlog_y_peak ) minuit_pois = Minuit(fit_pois, **fit_dict_pois) minuit_pois.errordef=Minuit.LIKELIHOOD minuit_pois.strategy=2 minuit_pois.errors["mu"] = fit_dmu minuit_pois.errors["lbda"] = fit_dlbda minuit_pois.limits["lbda"] = (self._eps, 1-self._eps) minuit_pois.limits["mu"] = (self._eps, None) minuit_pois.migrad(ncall= self._n_call_minuit, iterate =self._n_iterations_minuit) minuit_pois.hesse() if(not minuit_pois.valid): if(self._verbose): print('Minuit returned invalid for Generalised Poisson fit to peaks. Using basic estimated parameters instead...') self._GD_data["fit_GP_mu"] = fit_mu self._GD_data["fit_GP_mu_err"] = fit_dmu self._GD_data["fit_GP_lbda"] = fit_lbda self._GD_data["fit_GP_lbda_err"] = fit_dlbda self._GD_data["fit_GP_A"] = fit_A self._GD_data["fit_GP_A_err"] = fit_dA else: self._GD_data["fit_GP_mu"] = minuit_pois.values["mu"] self._GD_data["fit_GP_mu_err"] = minuit_pois.errors["mu"] self._GD_data["fit_GP_lbda"] = minuit_pois.values["lbda"] self._GD_data["fit_GP_lbda_err"] = minuit_pois.errors["lbda"] self._GD_data["fit_GP_A"] = minuit_pois.values["A"] self._GD_data["fit_GP_A_err"] = minuit_pois.errors["A"] k_low = 0 k_hi = self.GPoisPPF(ppf_mainfit, self._GD_data["fit_GP_mu"], self._GD_data["fit_GP_lbda"], self._max_n_peaks ) self._GD_data["fit_GP_k_low"] = k_low self._GD_data["fit_GP_k_hi"] = k_hi if(self._GD_data["fit_GP_k_hi"]<2): if(self._verbose): print("Too few primary Geiger peaks to fit. Artificially increasing max number of peaks from {:d} to 2...".format(self._GD_data["fit_GP_k_hi"])) self._GD_data["fit_GP_k_hi"] = max(self._GD_data["fit_GP_k_hi"], 2) elif(self._GD_data["fit_GP_k_hi"]>self._max_n_peaks): if(self._verbose): print("Too many primary Geiger peaks to fit. Artificially decreasing max number of peaks from {:d} to {:d}...".format(self._GD_data["fit_GP_k_hi"], self._max_n_peaks)) self._GD_data["fit_GP_k_hi"] = min(self._GD_data["fit_GP_k_hi"], self._max_n_peaks) def PreFitSigmaAndGain(self, data, n_bootstrap): peak_val = self._peak_data["n_peak_s"] data_s = np.squeeze(self.scaler.transform(data.reshape(-1,1))) self._peak_data["peak_mean_s"]=[] self._peak_data["peak_mean_err_s"]=[] self._peak_data["peak_var_s"]=[] self._peak_data["peak_var_err_s"]=[] for x_i, y_peak, x_peak, x_width in zip(self._peak_data["n_peak_s"], self._peak_data["y_peak_s"], self._peak_data["x_peak_s"], self._peak_data["x_width_s"]): data_range = self.SelectRange(data_s, x_peak - x_width/2, x_peak + x_width/2) est_mean, std_err_mean, _ = self.Bootstrap(data_range, np.mean, n_samples=n_bootstrap) est_var, std_err_var, _ = self.Bootstrap(data_range, np.var, n_samples=n_bootstrap) self._peak_data["peak_mean_s"].append(est_mean) self._peak_data["peak_mean_err_s"].append(std_err_mean) self._peak_data["peak_var_s"].append(est_var) self._peak_data["peak_var_err_s"].append(std_err_var) self._peak_data["peak_mean_s"] = np.asarray(self._peak_data["peak_mean_s"]) self._peak_data["peak_mean_err_s"] = np.asarray(self._peak_data["peak_mean_err_s"]) self._peak_data["peak_var_s"] = np.asarray(self._peak_data["peak_var_s"]) self._peak_data["peak_var_err_s"] = np.asarray(self._peak_data["peak_var_err_s"]) n_peaks_select = self._peak_data["n_peak_s"] peaks_var_select = self._peak_data["peak_var_s"] peaks_var_err_select = self._peak_data["peak_var_err_s"] peaks_mean_select = self._peak_data["peak_mean_s"] peaks_mean_err_select = self._peak_data["peak_mean_err_s"] fit_lin_var = HuberRegression(self.DummyLinear, n_peaks_select, peaks_var_select, peaks_var_err_select ) m_var_temp = np.median(np.gradient(peaks_var_select, n_peaks_select)) c_var_temp = peaks_var_select[np.argmin(peaks_var_select)] fit_dict_lin_var = { "m": m_var_temp, "c": c_var_temp, } minuit_lin_var = Minuit(fit_lin_var, **fit_dict_lin_var) minuit_lin_var.strategy=2 minuit_lin_var.migrad(ncall= self._n_call_minuit, iterate =self._n_iterations_minuit) minuit_lin_var.hesse() fit_lin_G = HuberRegression(self.DummyLinear, n_peaks_select, peaks_mean_select, peaks_mean_err_select ) m_G_temp = self._GD_data["fit_G"] c_G_temp = self._GD_data["fit_x_0"] fit_dict_lin_G = { "m": m_G_temp, "c": c_G_temp, } minuit_lin_G = Minuit(fit_lin_G, **fit_dict_lin_G) minuit_lin_G.strategy=2 minuit_lin_G.migrad(ncall= self._n_call_minuit, iterate =self._n_iterations_minuit) minuit_lin_G.hesse() if((minuit_lin_var.values["m"]>self._eps) and minuit_lin_var.valid): self._GD_data["fit_var0"] = minuit_lin_var.values["c"] self._GD_data["fit_var0_err"] = minuit_lin_var.errors["c"] self._GD_data["fit_var1"] = minuit_lin_var.values["m"] self._GD_data["fit_var1_err"] = minuit_lin_var.errors["m"] self._GD_data["fit_sigma0"] = np.sqrt(self._GD_data["fit_var0"]) self._GD_data["fit_sigma0_err"] = (0.5/self._GD_data["fit_sigma0"])*self._GD_data["fit_var0_err"] self._GD_data["fit_sigma1"] = np.sqrt(self._GD_data["fit_var1"]) self._GD_data["fit_sigma1_err"] = (0.5/self._GD_data["fit_sigma1"])*self._GD_data["fit_var1_err"] else: if(self._verbose): print("Linear fit to peak variance returned invalid. Using basic estimated parameters instead...") self._GD_data["fit_sigma0"] = self._peak_data["x_width_s"][0]*self._FWHM2Sigma self._GD_data["fit_sigma0_err"] = 0.1*self._GD_data["fit_sigma0"] self._GD_data["fit_sigma1"] = abs(np.mean(np.diff(self._peak_data["x_width_s"]*self._FWHM2Sigma)/np.diff(self._peak_data["n_peak_s"]))) self._GD_data["fit_sigma1_err"] = 0.1*self._GD_data["fit_sigma1"] self._GD_data["fit_var0"] = self._GD_data["fit_sigma0"]**2 self._GD_data["fit_var0_err"] = 2*self._GD_data["fit_sigma0"]*self._GD_data["fit_sigma0_err"] self._GD_data["fit_var1"] = self._GD_data["fit_sigma1"]**2 self._GD_data["fit_var1_err"] = 2*self._GD_data["fit_sigma1"]*self._GD_data["fit_sigma1_err"] if((minuit_lin_G.values["m"]>self._eps) and minuit_lin_G.valid): self._GD_data["fit_G"] = minuit_lin_G.values["m"] self._GD_data["fit_G_err"] = minuit_lin_G.errors["m"] self._GD_data["fit_x_0"] = minuit_lin_G.values["c"] self._GD_data["fit_x_0_err"] = minuit_lin_G.errors["c"] else: if(self._verbose): print("Linear fit to peak mean returned invalid. Using basic estimated parameters instead...") self._GD_data["fit_G"] = self._GD_data["fit_G"] self._GD_data["fit_G_err"] = self._GD_data["fit_G_err"] self._GD_data["fit_x_0"] = self._GD_data["fit_x_0"] self._GD_data["fit_x_0_err"] = self._GD_data["fit_x_0_err"] def PreFitDCR(self, data, tau, tgate, r_fast, t_0, ppf_mainfit, kernel_kde, bandwidth_kde, n_bootstrap_kde, dcr_lim=0.05): G = self._GD_data["fit_G"] mu = self._GD_data["fit_GP_mu"] lbda = self._GD_data["fit_GP_lbda"] x_0 = self._GD_data["fit_x_0"] data_norm = (np.squeeze(self.scaler.transform(data.reshape(-1,1))) - x_0)/G try: x_dc_kde, y_dc_kde, y_dc_kde_err, _ = self.BootstrapKDE( data_norm, kernel=kernel_kde, bw=bandwidth_kde, n_samples=n_bootstrap_kde ) except: print("DCR KDE failed. Setting fit fail status and continuing...") self._failed = True return cond_interpeak = (x_dc_kde>0) & (x_dc_kde<1) x_dc_kde = x_dc_kde[cond_interpeak] y_dc_kde = y_dc_kde[cond_interpeak] y_dc_kde_err = y_dc_kde_err[cond_interpeak] f_lin_kde_dc = interp1d(x_dc_kde, np.arange(0, len(x_dc_kde)), fill_value='extrapolate') f_lin_kde_dc_inv = interp1d(np.arange(0, len(x_dc_kde)), x_dc_kde, fill_value='extrapolate') f_log_kde_dc = interp1d(x_dc_kde, self.Logify(y_dc_kde), fill_value='extrapolate') f_log_kde_upper_dc = interp1d(x_dc_kde, self.Logify(y_dc_kde + y_dc_kde_err), fill_value='extrapolate') f_log_kde_lower_dc = interp1d(x_dc_kde, self.Logify(y_dc_kde - y_dc_kde_err), fill_value='extrapolate') cond_inrange = (x_dc_kde>0) & (x_dc_kde<1) x_dc_kde = x_dc_kde[cond_inrange] y_dc_kde = y_dc_kde[cond_inrange] y_dc_kde_err = y_dc_kde_err[cond_inrange] x_dc_min = x_dc_kde[np.argmin(y_dc_kde)] x_dc_lim_min_low = x_dc_min - dcr_lim x_dc_lim_min_hi = x_dc_min + dcr_lim x_dc_lim_low = 0 x_dc_lim_mid = x_dc_min x_dc_lim_hi = f_lin_kde_dc_inv(f_lin_kde_dc(0) + 2*abs(f_lin_kde_dc(x_dc_min) - f_lin_kde_dc(0))) cond_interpeak = (x_dc_kde> x_dc_lim_min_low) & (x_dc_kde<=x_dc_lim_min_hi) cond_p_0toLM = (x_dc_kde> x_dc_lim_low) & (x_dc_kde<=x_dc_lim_mid) cond_p_LMto2LM = (x_dc_kde> x_dc_lim_mid) & (x_dc_kde<=x_dc_lim_hi) mean_p_interpeak = (quad(lambda x: np.exp(f_log_kde_dc(x)), x_dc_lim_min_low, x_dc_lim_min_hi)[0])/(2*dcr_lim) mean_p_interpeak_upper = (quad(lambda x: np.exp(f_log_kde_upper_dc(x)), x_dc_lim_min_low, x_dc_lim_min_hi)[0])/(2*dcr_lim) mean_p_interpeak_lower = (quad(lambda x: np.exp(f_log_kde_lower_dc(x)), x_dc_lim_min_low, x_dc_lim_min_hi)[0])/(2*dcr_lim) dmean_p_interpeak = abs(mean_p_interpeak_upper - mean_p_interpeak_lower) int_p_0toLM = quad(lambda x: np.exp(f_log_kde_dc(x)), x_dc_lim_low, x_dc_lim_mid)[0] int_p_0toLM_upper = quad(lambda x: np.exp(f_log_kde_upper_dc(x)), x_dc_lim_low, x_dc_lim_mid)[0] int_p_0toLM_lower = quad(lambda x: np.exp(f_log_kde_lower_dc(x)), x_dc_lim_low, x_dc_lim_mid)[0] dint_p_0toLM = abs(int_p_0toLM_upper - int_p_0toLM_lower) int_p_LMto2LM = quad(lambda x: np.exp(f_log_kde_dc(x)), x_dc_lim_mid, x_dc_lim_hi)[0] int_p_LMto2LM_upper = quad(lambda x: np.exp(f_log_kde_upper_dc(x)), x_dc_lim_mid, x_dc_lim_hi)[0] int_p_LMto2LM_lower = quad(lambda x: np.exp(f_log_kde_lower_dc(x)), x_dc_lim_mid, x_dc_lim_hi)[0] dint_p_LMto2LM = abs(int_p_LMto2LM_upper - int_p_LMto2LM_lower) int_p = int_p_0toLM + 0.5*int_p_LMto2LM dint_p = np.sqrt(dint_p_0toLM**2 + 0.5*dint_p_LMto2LM**2) DCR = mean_p_interpeak/(max(int_p, self._eps)*4*tau) dDCR = DCR*np.sqrt((dmean_p_interpeak/max(mean_p_interpeak, self._eps))**2 + (dint_p/max(int_p, self._eps))**2) mu_dcr= DCR*(abs(t_0) + tgate) k_dcr_low = 0 k_dcr_hi = self.GPoisPPF(ppf_mainfit, mu_dcr, self._GD_data["fit_GP_lbda"], self._max_n_dcr_peaks ) self._DCR_data["x_dc_kde"] = x_dc_kde self._DCR_data["y_dc_kde"] = y_dc_kde self._DCR_data["y_dc_kde_err"] = y_dc_kde_err self._DCR_data["cond_p_0toLM"] = cond_p_0toLM self._DCR_data["cond_p_LMtoL2M"] = cond_p_LMto2LM self._DCR_data["cond_interpeak"] = cond_interpeak self._DCR_data["int_p_0toLM"] = int_p_0toLM self._DCR_data["dint_p_0toLM"] = dint_p_0toLM self._DCR_data["int_p_LMto2LM"] = int_p_LMto2LM self._DCR_data["dint_p_LMto2LM"] = dint_p_LMto2LM self._DCR_data["mean_p_interpeak"] = mean_p_interpeak self._DCR_data["dmean_p_interpeak"] = dmean_p_interpeak self._DCR_data["fit_tau"] = tau self._DCR_data["fit_tgate"] = tgate self._DCR_data["fit_t_0"] = t_0 self._DCR_data["fit_DCR"] = DCR self._DCR_data["fit_DCR_err"] = dDCR self._DCR_data["fit_dcr_k_low"] = k_dcr_low self._DCR_data["fit_dcr_k_hi"] = k_dcr_hi if(r_fast is None): self._DCR_data["fit_r_fast"] = 0 else: self._DCR_data["fit_r_fast"] = r_fast if(self._DCR_data["fit_dcr_k_hi"]<2): print("Too few dark peaks to fit. Artificially increasing max number of peaks from {:d} to 2...".format(self._DCR_data["fit_dcr_k_hi"])) self._DCR_data["fit_dcr_k_hi"] = max(self._DCR_data["fit_dcr_k_hi"], 2) elif(self._DCR_data["fit_dcr_k_hi"]>self._max_n_peaks): print("Too many dark peaks to fit. Artificially decreasing max number of peaks from {:d} to {:d}...".format(self._DCR_data["fit_dcr_k_hi"], self._max_n_peaks)) self._DCR_data["fit_dcr_k_hi"] = min(self._DCR_data["fit_dcr_k_hi"], self._max_n_dcr_peaks) def InitFit(self, data, tau, t_0, r_fast, tgate, pedestal, n_bootstrap, n_bootstrap_kde, bw, peak_dist_factor, peak_width_factor, prominence, min_n_peak_evts, kernel_kde, bandwidth_kde, ppf_mainfit, bin_method ): if(self._is_histogram): data = np.array(list(chain.from_iterable([[x]*int(y) for x,y in zip(data[:,0], data[:,1])]))) if(not self._failed): if(self._verbose): print("Getting histogram and peak values...") self.GetHistAndPeaks( data, bw, peak_dist_factor, peak_width_factor, pedestal, prominence, bin_method, n_bootstrap, n_bootstrap_kde, bandwidth_kde, kernel_kde, min_n_peak_evts) if(not self._failed): if(self._verbose): print("Fitting Generalised Poisson distribution...") self.PreFitGenPoisToPeaks(ppf_mainfit, n_bootstrap) if(not self._failed): if(self._verbose): print("Fitting peak widths via bootstrap resampling...") self.PreFitSigmaAndGain(data, n_bootstrap) if(not self._failed): if(self._verbose): print("Estimating DCR from interpeak region...") self.PreFitDCR(data, tau, tgate, r_fast, t_0, ppf_mainfit, kernel_kde, bandwidth_kde, n_bootstrap_kde) if(not self._failed): if(self._verbose): print("Prefitting complete.") # FIT def Fit(self, data, **kwargs): kwargs_fit = self._default_fit_kwargs.copy() kwargs_fit.update(kwargs) if(not isinstance(data,np.ndarray)): raise Exception("Data is in {0} format. Please provide data in the format of a NumPy array.".format(type(data))) else: if(data.ndim==1): if(self._verbose): print("Data is 1D. Assuming list of charges as input...") self._is_histogram = False elif(data.ndim==2): if(self._verbose): print("Data is 2D. Assuming histogram as input...") self._is_histogram = True else: raise Exception("Please provide data in the format of either a list of charges (1D) or a histogram (2D)") if(kwargs_fit["t_0"] is None): raise Exception("t_0 is not set. Please set t_0 as a keyword argument to be the integration period before gate in nanoseconds.") if(kwargs_fit["tgate"] is None): raise Exception("tgate is not set. Please set tgate as a keyword argument to be the integration gate length in nanoseconds.") if(kwargs_fit["tau"] is None): raise Exception("tau is not set. Please set tau as a keyword argument to be the slow component of the SiPM pulse in nanoseconds.") if(kwargs_fit["r_fast"] is None): raise Exception("r_fast is not set. Please set r_fast as a keyword argument to be the fraction of the fast component of the SiPM pulse in nanoseconds.") if(kwargs_fit["pedestal"] is None): raise Exception("pedestal is not set. Please set pedestal to be the pedestal position in units of the input charge spectrum being fitted.") print("Performing fit initialisation...") self.Init() self.InitFit(data, **kwargs_fit) if(not self._failed): print("Fitting...") self.N = len(data) G = self._GD_data["fit_G"] dG = self._GD_data["fit_G_err"] mu = self._GD_data["fit_GP_mu"] dmu = self._GD_data["fit_GP_mu_err"] lbda = self._GD_data["fit_GP_lbda"] dlbda = self._GD_data["fit_GP_lbda_err"] k_low = self._GD_data["fit_GP_k_low"] k_hi = self._GD_data["fit_GP_k_hi"] x_0 = self._GD_data["fit_x_0"] dx_0 = self._GD_data["fit_x_0_err"] sigma_0 = self._GD_data["fit_sigma0"] sigma_1 = self._GD_data["fit_sigma1"] dsigma_0 = self._GD_data["fit_sigma0_err"] dsigma_1 = self._GD_data["fit_sigma1_err"] dx = self._GD_data["bw_s"] tau = self._DCR_data["fit_tau"] tgate = self._DCR_data["fit_tgate"] t_0 = self._DCR_data["fit_t_0"] DCR = self._DCR_data["fit_DCR"] dDCR = self._DCR_data["fit_DCR_err"] r_fast = self._DCR_data["fit_r_fast"] k_dcr_low = self._DCR_data["fit_dcr_k_low"] k_dcr_hi =self._DCR_data["fit_dcr_k_hi"] # beta is set to G/2 good guess self.fit_dict = { "mu":mu, "G":G, "x_0": x_0, "sigma_0": sigma_0, "sigma_1": sigma_1, "alpha": 0.2, "r_beta": 0.2, "lbda":lbda, "tau": tau, "tgate":tgate, "t_0": t_0, "DCR":DCR, "r_fast":r_fast, "k_low":k_low, "k_hi":k_hi, "k_dcr_low":k_dcr_low, "k_dcr_hi":k_dcr_hi, "dx": dx, } bl = BinnedLH(self.DRM, self._GD_data["x_s"], self._GD_data["y_s"]) minuit = Minuit(bl, **self.fit_dict) minuit.errors["alpha"] = 0.01 minuit.errors["r_beta"] = 0.01 if(not np.isnan(dmu) and dmu is not None): minuit.errors["mu"] = abs(dmu) if(not np.isnan(dG) and dG is not None): minuit.errors["G"] = abs(dG) if(not np.isnan(dx_0) and dx_0 is not None): minuit.errors["x_0"] = abs(dx_0) if(not np.isnan(dlbda) and dlbda is not None): minuit.errors["lbda"] = abs(dlbda) if(not np.isnan(dsigma_0) and dsigma_0 is not None): minuit.errors["sigma_0"] = abs(dsigma_0) if(not np.isnan(dsigma_1) and dsigma_1 is not None): minuit.errors["sigma_1"] = abs(dsigma_1) if(not np.isnan(dDCR) and dDCR is not None): minuit.errors["DCR"] = abs(dDCR) minuit.limits["mu"] = (self._eps, max(1, k_hi)) minuit.limits["G"] = (self._eps, None) minuit.limits["x_0"] = (x_0-G/2, x_0+G/2) minuit.limits["alpha"] = (self._eps, 1-self._eps) minuit.limits["r_beta"] = (self._eps, 1-self._eps) minuit.limits["lbda"] = (self._eps, 1-self._eps) minuit.limits["sigma_0"] = (self._eps, G) minuit.limits["sigma_1"] = (self._eps, G) minuit.limits["DCR"] = (self._eps, None) minuit.fixed["k_low"] = True minuit.fixed["k_hi"] = True minuit.fixed["tgate"] = True minuit.fixed["tau"] = True minuit.fixed["t_0"] = True minuit.fixed["r_fast"] = True minuit.fixed["dx"] = True minuit.fixed["k_dcr_low"] = True minuit.fixed["k_dcr_hi"] = True minuit.fixed["tgate"] = True minuit.strategy=2 minuit.errordef=minuit.LIKELIHOOD minuit.migrad(ncall= self._n_call_minuit, iterate =self._n_iterations_minuit) minuit.hesse() if(self._verbose): print(minuit) self._fit_minuit = minuit self._fit_values_s = minuit.values.to_dict() self._fit_errors_s = minuit.errors.to_dict() self._fit_values = {} self._fit_errors = {} for key, value in self._fit_values_s.items(): if(key=="x_0"): self._fit_values[key] = value*self.scaler.scale_[0] + self.scaler.center_[0] elif(key=="G"): self._fit_values[key] = value*self.scaler.scale_[0] elif(key=="sigma_0"): self._fit_values[key] = value*self.scaler.scale_[0] elif(key=="sigma_1"): self._fit_values[key] = value*self.scaler.scale_[0] elif(key=="dx"): self._fit_values[key] = self._GD_data["bw"] else: self._fit_values[key] = value for key, value in self._fit_errors_s.items(): if(key=="x_0"): self._fit_errors[key] = value*self.scaler.scale_[0] + self.scaler.center_[0] elif(key=="G"): self._fit_errors[key] = value*self.scaler.scale_[0] elif(key=="sigma_0"): self._fit_errors[key] = value*self.scaler.scale_[0] elif(key=="sigma_1"): self._fit_errors[key] = value*self.scaler.scale_[0] elif(key=="dx"): self._fit_errors[key] = 0.1*self._GD_data["bw"] else: self._fit_errors[key] = value data = [] prms = [] for par in self._fit_minuit.parameters: if(self._fit_minuit.fixed[par]): continue prms.append(par) data.append([self._fit_values[par], self._fit_errors[par]]) data = np.asarray(data) prms = np.asarray(prms) _print = pd.DataFrame(columns = prms, index=["Values", "Errors"], data = data.T).T _print.style.format({ 'Values': lambda val: f'{val:,.2E}', 'Errors': lambda val: f'{val:,.2E}' })