Select Git revision
demo_final.Rproj
-
Bartke, Simon authored
R Markdown with custom YAML created
Bartke, Simon authoredR Markdown with custom YAML created
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
LightSimtastic.py 22.46 KiB
import warnings
import time
import numpy as np
import pandas as pd
import scipy
import scipy.stats as stats
import scipy.signal as signal
from tqdm.auto import tqdm
from itertools import chain, product, zip_longest
from operator import add
from types import SimpleNamespace
from AdditionalPDFs import borel
import codecs
np.set_printoptions(suppress=True)
# evil but works: just suppress the numpy nested ragged array warning
if hasattr(np, 'VisibleDeprecationWarning'):
warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning)
else:
warnings.filterwarnings("ignore", category=DeprecationWarning)
class SiPMSimulation:
def __init__(self):
'"Name_Sim"', '"mu"', '"ppXT"', '"pdXT"', '"taudXT"', '"rdXT"', '"pAp"', '"tauAp"', '"taur"', '"Sig0"', '"Sig1"', '"DCR"', '"Sigt"', '"GSig"', '"tslow"', '"tfast"', '"rfast"', '"start_tgate"', '"len_tgate"', '"t0"', '"tl0"', '"tl1"', '"tl2"',
self.VariableDictionary = {
"Name_Sim": {"Description": "Name for the simulation.", "Unit": None, },
"mu": {"Description": "Mean Number of Primary Geiger discharges in SiPM Cell", "Unit": "Units of Charge, Normalized to Gain [Q/G]"},
"ppXT": {"Description": "Characteristic Probability of Prompt Cross Talk in SiPM Cell", "Unit": None},
"pdXT": {"Description": "Characteristic Probability of Delayed Cross Talk in SiPM Cell", "Unit": None},
"taudXT": {"Description": "Time Constant of Delayed Cross Talk in SiPM Cell", "Unit": "Nanoseconds [ns]"},
"rdXT": {"Description": " Fraction of Delayed Cross Talk from Adjacent SiPM Cells", "Unit": None},
"pAp": {"Description": " Probability of Afterpulse occurrence in SiPM Cell", "Unit": None},
"tauAp": {"Description": " Time delay constant of Afterpulsing in SiPM Cell", "Unit": None},
"taur": {"Description": " PDE Recovery constant in SiPM Cell", "Unit": None},
"Sig0": {"Description": "Fluctuation in Gain of Geiger Discharge", "Unit": "Units of Charge, Normalized to Gain [Q/G]"},
"Sig1": {"Description": " Pedestal width due to Noise from Electronic Components", "Unit": "Units of Charge, Normalized to Gain [Q/G]"},
"DCR": {"Description": "Dark Count Rate", "Unit": "Gigahertz [GHz]"},
"Sigt": {"Description": "Electronics Noise for Current Transient", "Unit": "Nanoseconds [ns]"},
"GSig": {"Description": "Width of Electronics Transfer Function", "Unit": "Nanoseconds [ns]"},
"tslow": {"Description": "Time constant of Slow Component of SiPM pulse shape.", "Unit": "Nanoseconds [ns]"},
"tfast": {"Description": "Time constant of Fast Component of SiPM pulse shape.", "Unit": "Nanoseconds [ns]"},
"rfast": {"Description": "Fraction of Fast Component contribution to SiPM pulse shape.", "Unit": None},
"start_tgate": {"Description": "Start time of SiPM Integration Window", "Unit": "Nanoseconds [ns]"},
"len_tgate": {"Description": "Duration of SiPM Integration Window", "Unit": "Nanoseconds [ns]"},
"t0": {"Description": "Duration before Dark Count Evaluation", "Unit": "Nanoseconds [ns]"},
"tl0": {"Description": "Free Parameter 0 for Light Pulse Distribution", "Unit": "Nanoseconds [ns]"},
"tl1": {"Description": "Free Parameter 1 for Light Pulse Distribution", "Unit": "Nanoseconds [ns]"},
"tl2": {"Description": "Free Parameter 2 for Light Pulse Distribution", "Unit": "Nanoseconds [ns]"},
"Gen_mu": {"Description": "Function for the random sampling of the number of Primary Gieger Discharges", "Unit": None, "Options": ["Poisson", "Gauss", "Fixed"]},
"Gen_tmu": {"Description": "Function for the random sampling of the time of primary Gieger discharges", "Unit": None, "Options": ["Gauss", "Exp"]},
"Gen_gain": {"Description": "Function for the random sampling of the SiPM Gain", "Unit": None, "Options": ["Gauss"]},
"Gen_npXT": {"Description": "Function for the random sampling of the prompt cross-talk", "Unit": None, "Options": ["Poisson", "Binom", "Borel"]},
"Gen_ndXT": {"Description": "Function for the random sampling of the number delayed cross-talk discharges", "Unit": None, "Options": ["Poisson", "Binom", "Borel"]},
"Gen_tdXT": {"Description": "Function for the random sampling of the time of delayed cross-talk discharges", "Unit": None, "Options": ["Exp"]}
}
self.pb_ag_text = ["Sampling Primary Geiger Distributions...", "Sampling SiPM Noise Distributions...",
"Calculating Prompt Cross Talk...", "Calculating Delayed Cross Talk...",
"Calculating Afterpulses...", "Combining SiPM Noise...", "All Geiger Discharges Calculated.",]
self.pb_Q_text = ["Sampling Charge Distributions...",
"Calculating Charge from Geiger Discharges...", "Charge Spectrum Calculated."]
self.pb_I_text = ["Sampling Current Distributions...", "Calculating Noise from Geiger Discharges...",
"Convolving Electronic Noise Function...", "Transient Calculated."]
self.out_columns = ["GeigerArray", "ChargeSpectrum", "Transients", "TimeElapsed_GeigerArray",
"TimeElapsed_ChargeSpectrum", "TimeElapsed_Transients", "NSim"]
self.pars = pd.DataFrame(columns=list(
self.VariableDictionary.keys()) + self.out_columns)
self.pars["GeigerArray"] = self.pars["GeigerArray"].astype(object)
self.pars["ChargeSpectrum"] = self.pars["ChargeSpectrum"].astype(object)
self.pars["Transients"] = self.pars["Transients"].astype(object)
def ReadMathCad(self, inputfile):
variables = {}
with codecs.open(inputfile, encoding='utf-8') as file:
def _SetFloating(string):
try:
string = float(string)
return string
except ValueError:
return string
def _ToFloats(list_strings):
return [_SetFloating(string) for string in list_strings]
def _RemoveNonAscii(string):
return "".join(i for i in string if ord(i) < 126 and ord(i) > 31)
lines = file.read()
for line in lines.splitlines(True):
line = _RemoveNonAscii(line)
elems = line.replace('"', "").split(",")
elems = _ToFloats(elems)
if elems[0] in self.VariableDictionary:
variables[elems[0]] = elems[1::]
self.AddVariables(variables)
def ClearVariables(self):
self.pars = self.pars.iloc[0:0]
def AddVariables(self, variables):
if not isinstance(variables, dict):
raise Exception("Expected a dictionary of parameters. Please enter a dictionary with all of the following elements: {0}.".format(", ".join(list(self.VariableDictionary.keys()))))
missing = np.setdiff1d(
list(self.VariableDictionary.keys()), list(variables.keys()))
if (len(missing) > 0):
raise Exception("Variables are missing from the entered dictionary. Please add following elements: {0}.".format(", ".join(list(missing))))
for key, value in variables.items():
if not isinstance(value, list):
raise Exception("Expected a list of simulation parameters, but a type {0} was found for variable {1}. Please enter your simulation parameters in a list.".format(type(value), key))
self.ClearVariables()
for key, value in variables.items():
self.pars[key] = value
def GetInformation(self, key):
try:
temp = pd.DataFrame.from_dict(self.VariableDictionary[key], orient='index')
print(temp)
except:
raise Exception("Parameter not found. Available parameters are: {0}".format(", ".join(list(self.VariableDictionary.keys()))))
def PrintVariables(self, latex=False):
if (latex):
df_var = pd.DataFrame.from_dict(self.VariableDictionary)
print(df_var.to_latex())
else:
print(self.pars)
def dXTtSplit(self, dXT_nt, dXT_n):
output = []
partialSum = 0
for _i in dXT_n:
temp = []
for _ii in _i:
temp.append(dXT_nt[partialSum:partialSum+_ii])
partialSum += _ii
output.append(temp)
return output
def AllGeiger(self, ns):
pbar = tqdm(range(len(self.pb_ag_text)-1))
pbar.set_description("{0}".format(self.pb_ag_text[0]))
nlight = np.asarray(ns.dist_flight.rvs(size=ns.n_sim))
nDC = np.asarray(ns.dist_fDC.rvs(size=ns.n_sim))
if (len(nlight) > len(nDC)):
nDC.resize(nlight.shape)
elif (len(nDC) > len(nlight)):
nlight.resize(nDC.shape)
nPG = nDC + nlight
pbar.update(1)
pbar.set_description("{0}".format(self.pb_ag_text[1]))
nlight_sum = np.sum(nlight)
nDC_sum = np.sum(nDC)
nPG_sum = nlight_sum + nDC_sum
npXT = np.array(ns.dist_fpXT.rvs(size=nPG_sum))
ndXT = np.array(ns.dist_fdXT.rvs(size=nPG_sum))
npXT_sum = np.sum(npXT)
nPG_pXT_sum = nPG_sum + npXT_sum
pg_l_d = np.ones(nlight_sum)
pg_l_t = np.asarray(ns.dist_ftlight.rvs(size=nlight_sum))
pg_dc_d = np.ones(nDC_sum)
pg_dc_t = np.asarray(ns.dist_ftDC.rvs(size=nDC_sum))
nlight_cumsum = np.cumsum(nlight)
nDC_cumsum = np.cumsum(nDC)
nPG_cumsum = np.cumsum(nPG)
ndXT_cumsum = np.cumsum(ndXT)
pg_l_d = np.split(pg_l_d, nlight_cumsum)[:-1]
pg_l_t = np.split(pg_l_t, nlight_cumsum)[:-1]
pg_dc_d = np.split(pg_dc_d, nDC_cumsum)[:-1]
pg_dc_t = np.split(pg_dc_t, nDC_cumsum)[:-1]
pXT_n = np.split(npXT, nPG_cumsum)[:-1]
dXT_n = np.split(ndXT, nPG_cumsum)[:-1]
pg_d = [np.concatenate(elem) for elem in zip(pg_l_d, pg_dc_d)]
pg_t = [np.concatenate(elem) for elem in zip(pg_l_t, pg_dc_t)]
pbar.update(1)
pbar.set_description("{0}".format(self.pb_ag_text[2]))
pXT_d = [np.array(list(chain.from_iterable([[elem_pg_d]*elem_pXT_n
if (elem_pXT_n > 0 and elem_pg_d > 0)
else []
for elem_pg_d, elem_pXT_n in zip(list_pg_d, list_pXT_n)])))
for list_pg_d, list_pXT_n in zip(pg_d, pXT_n)]
pXT_t = [np.array(list(chain.from_iterable([[elem_pg_t]*elem_pXT_n
if (elem_pXT_n > 0 and elem_pg_d > 0)
else []
for elem_pg_d, elem_pg_t, elem_pXT_n in zip(list_pg_d, list_pg_t, list_pXT_n)])))
for list_pg_d, list_pg_t, list_pXT_n in zip(pg_d, pg_t, pXT_n)]
pbar.update(1)
pbar.set_description("{0}".format(self.pb_ag_text[3]))
dXT_d = [np.array(list(chain.from_iterable([[elem_pg_d]*elem_dXT_n
if (elem_dXT_n > 0 and elem_pg_d > 0)
else []
for elem_pg_d, elem_dXT_n in zip(list_pg_d, list_dXT_n)])))
for list_pg_d, list_dXT_n in zip(pg_d, dXT_n)]
dXT_t = [np.array(list(chain.from_iterable([list(map(add, [elem_pg_t]*elem_dXT_n,ns.dist_ftdXT.rvs(size=elem_dXT_n)))
if (elem_dXT_n > 0 and elem_pg_d > 0)
else []
for elem_pg_d, elem_pg_t, elem_dXT_n in zip(list_pg_d, list_pg_t, list_dXT_n)])))
for list_pg_d, list_pg_t, list_dXT_n in zip(pg_d, pg_t, dXT_n)]
pbar.update(1)
pbar.set_description("{0}".format(self.pb_ag_text[4]))
apg_d = [np.concatenate(elem) for elem in zip(pg_d, pXT_n, dXT_n)]
apg_t = [np.concatenate(elem) for elem in zip(pg_t, pXT_t, dXT_t)]
Ap_d = []
Ap_t = []
for list_apg_d, list_apg_t in zip(apg_d, apg_t):
_Apd_list = []
_Apt_list = []
for d, t in zip(list_apg_d, list_apg_t):
if (d > 0):
_tAp = ns.dist_ftAp.rvs(size=int(d))
_nAp = stats.binom.rvs(
n=np.ones(int(d)).astype(int),
p=(ns.pAp*(1 - np.exp(-_tAp/ns.taur))),
size=int(d))
cond_sel = (_nAp > 0)
if (sum(cond_sel) > 0):
_tAp = _tAp[cond_sel]
_nAp = _nAp[cond_sel]
_Apd = (1-np.exp(-_tAp/ns.tslow))*_nAp
_Apt = t + _tAp
else:
_Apd = []
_Apt = []
else:
_Apd = []
_Apt = []
_Apd_list.extend(list(_Apd))
_Apt_list.extend(list(_Apt))
Ap_d.append(_Apd_list)
Ap_t.append(_Apt_list)
Ap_d = np.array(Ap_d, dtype="object")
Ap_t = np.array(Ap_t, dtype="object")
pbar.update(1)
pbar.set_description("{0}".format(self.pb_ag_text[5]))
ag_d = [np.concatenate((list_pg_d, list_pXT_d, list_dXT_d, list_Ap_d))
for list_pg_d, list_pXT_d, list_dXT_d, list_Ap_d, in zip(pg_d, pXT_d, dXT_d, Ap_d)]
ag_t = [np.concatenate((list_pg_t, list_pXT_t, list_dXT_t, list_Ap_t))
for list_pg_t, list_pXT_t, list_dXT_t, list_Ap_t, in zip(pg_t, pXT_t, dXT_t, Ap_t)]
ns.ag_d = np.asarray(ag_d, dtype="object")
ns.ag_t = np.asarray(ag_t, dtype="object")
pbar.update(1)
pbar.set_description("{0}".format(self.pb_ag_text[6]))
def QsG(self, ns, t):
tslow = ns.tslow
tfast = ns.tfast
rfast = ns.rfast
tstart = ns.start_tgate
tend = ns.start_tgate + ns.len_tgate
if (t < tstart):
QsG_slow = (1 - rfast)*(np.exp(-(tstart - t)/tslow) - np.exp(-(tend - t)/tslow))
QsG_fast = (rfast)*(np.exp(-(tstart - t)/tfast) - np.exp(-(tend - t)/tfast))
QsG = QsG_slow + QsG_fast
elif (t >= tstart and t < tend):
QsG_slow = (1 - rfast)*np.exp(-(tend - t)/tslow)
QsG_fast = (rfast)*np.exp(-(tend - t)/tfast)
QsG = 1 - QsG_slow - QsG_fast
else:
QsG = 0
return QsG
def IsG(self, ns, t):
Islow = ((1 - ns.rfast)/ns.tslow)*(np.exp(-t/ns.tslow))
Ifast = (ns.rfast/ns.tfast)*(np.exp(-t/ns.tfast))
IsG = Islow + Ifast
return IsG
def QSiPM(self, ns):
pbar = tqdm(range(len(self.pb_Q_text)-1))
pbar.set_description("{0}".format(self.pb_Q_text[0]))
Q = np.array(ns.dist_Q.rvs(ns.n_sim))
n_ag_d = np.asarray([len(list_ag_d) for list_ag_d in ns.ag_d])
Sig1 = ns.dist_Sig1.rvs(np.sum(n_ag_d))
n_ag_d_cumsum = np.cumsum(n_ag_d)[:-1]
ns.Sig1 = np.split(Sig1, n_ag_d_cumsum)
pbar.update(1)
pbar.set_description("{0}".format(self.pb_Q_text[1]))
Q_SiPM = np.array([np.sum(np.asarray([elem_Sig1*elem_ag_d*self.QsG(ns, elem_ag_t)
if (elem_ag_d > 0)
else 0
for elem_Sig1, elem_ag_d, elem_ag_t in zip(list_Sig1, list_ag_d, list_ag_t)]))
for list_Sig1, list_ag_d, list_ag_t in zip(ns.Sig1, ns.ag_d, ns.ag_t)])
Q += Q_SiPM
pbar.update(1)
pbar.set_description("{0}".format(self.pb_Q_text[2]))
pbar.close()
return Q
def ISiPM(self, ns, ntim=1000, n_transients=200, convolve=True):
_range = [-ns.t0, ns.start_tgate+ns.len_tgate]
dtim = int(np.ceil((_range[1] - _range[0])/ntim-1))
tim = np.linspace(_range[0], _range[1], ntim)
I = ns.dist_I.rvs(n_transients*len(tim))
I = np.array(np.split(I, len(tim))).T
Gt = ns.dist_Gt.pdf(tim)
n_ag_d = np.asarray([len(list_ag_d) for list_ag_d in ns.ag_d])
Sig1 = ns.dist_Sig1.rvs(np.sum(n_ag_d))
n_ag_d_cumsum = np.cumsum(n_ag_d)[:-1]
Sig1 = np.split(Sig1, n_ag_d_cumsum)
ns.Sig1 = Sig1
ag_d_temp = ns.ag_d[0:int(n_transients)]
ag_t_temp = ns.ag_t[0:int(n_transients)]
Sig1_temp = ns.Sig1[0:int(n_transients)]
I_SiPM = np.array([np.sum(np.asarray([elem_Sig1*elem_ag_d * self.IsG(ns, elem_tim - elem_ag_t)
if (elem_ag_d > 0 and elem_ag_t < elem_tim)
else 0
for (elem_Sig1, elem_ag_d, elem_ag_t), elem_tim in product(zip(list_Sig1,list_ag_d,list_ag_t), tim)]).reshape(len(list_ag_d), ntim), axis=0)
for list_Sig1, list_ag_d, list_ag_t in zip(tqdm(Sig1_temp), ag_d_temp, ag_t_temp)])
I += I_SiPM
I = np.apply_along_axis(signal.fftconvolve, 1, I, Gt, 'same')
return I, tim
def Simulate(self,n_sim,transients=False,n_transients=None,verbose=False):
n_sim = int(n_sim)
for index, row in self.pars.iterrows():
dictionary = row.to_dict()
if (verbose):
print("\n-------------------\n")
print("\nSIMULATION {0}\n - {1}".format(index, row["Name_Sim"]))
print("\n-------------------\n")
for key, value in dictionary.items():
if (key != "Name_Sim" and key not in self.out_columns):
print("{0} \t\t {1} {2}".format(key, value, self.VariableDictionary[key]["Unit"]))
print("\n-------------------\n")
ns = SimpleNamespace(**dictionary)
if (ns.Gen_mu == "Poisson"):
ns.dist_flight = scipy.stats.poisson(mu=ns.mu)
elif (ns.Gen_mu == "Fixed"):
ns.dist_flight = scipy.stats.uniform(loc=ns.mu, scale=0)
if (ns.Gen_tmu == "Gauss"):
ns.dist_ftlight = scipy.stats.norm(loc=ns.tl0, scale=ns.tl1)
elif (ns.Gen_tmu == "Exp"):
ns.dist_ftlight = scipy.stats.expon(loc=ns.tl0, scale=ns.tl1)
else:
raise Exception("The Time for Primary Geiger Discharge distribution mode that has been selected is invalid. Please choose from the following options: {0}".format(", ".join(self.VariableDictionary["Gen_tmu"]["options"])))
ns.dist_fDC = scipy.stats.poisson(mu=(ns.t0 + ns.start_tgate + ns.len_tgate)*ns.DCR)
ns.dist_ftDC = scipy.stats.uniform(loc=-ns.t0, scale=ns.t0+ns.start_tgate+ns.len_tgate)
if (ns.Gen_npXT == "Poisson"):
ns.dist_fpXT = scipy.stats.poisson(mu=ns.ppXT)
elif (ns.Gen_npXT == "Binomial"):
ns.dist_fpXT = scipy.stats.binom(n=1, p=ns.ppXT)
elif (ns.Gen_npXT == "Borel"):
ns.dist_fpXT = borel(mu=ns.ppXT)
else:
raise Exception("The Prompt Cross-talk Discharge distribution mode that has been selected is invalid. Please choose from the following options: {0}".format(", ".join(self.VariableDictionary["Gen_npXT"]["options"])))
if (ns.Gen_ndXT == "Poisson"):
ns.dist_fdXT = scipy.stats.poisson(mu=ns.pdXT)
elif (ns.Gen_ndXT == "Binomial"):
ns.dist_fdXT = scipy.stats.binom(n=1, p=ns.pdXT)
elif (ns.Gen_ndXT == "Borel"):
ns.dist_fdXT = borel(mu=ns.pdXT)
else:
raise Exception("The Delayed Cross-talk Discharge mode that has been selected is invalid. Please choose from the following options: {0}".format(", ".join(self.VariableDictionary["Gen_ndXT"]["options"])))
if (ns.Gen_tdXT == "Exp"):
ns.dist_ftdXT = scipy.stats.expon(scale=(ns.taudXT))
else:
raise Exception("The Time of Delayed Cross-talk Discharge mode that has been selected is invalid. Please choose from the following options: {0}".format(", ".join(self.VariableDictionary["Gen_tdXT"]["options"])))
ns.dist_ftAp = scipy.stats.expon(scale=(ns.tauAp))
ns.dist_Q = scipy.stats.norm(loc=0, scale=ns.Sig0)
ns.dist_Sig1 = scipy.stats.norm(loc=1, scale=ns.Sig1)
ns.dist_I = scipy.stats.norm(loc=0, scale=ns.Sigt)
ns.dist_Gt = scipy.stats.norm(loc=0, scale=ns.GSig)
ns.n_sim = n_sim
self.pars.at[index, "NSim"] = n_sim
start_time = time.time()
self.AllGeiger(ns)
end_time = time.time()
start_time = time.time()
Qs = self.QSiPM(ns)
end_time = time.time()
self.pars.at[index, "GeigerArray"] = np.vstack([ns.ag_d, ns.ag_t])
# self.pars.at[index, "TimeElapsed_GeigerArray"] = end_time-start_time
self.pars.at[index, "ChargeSpectrum"] = Qs
self.pars.at[index, "TimeElapsed_ChargeSpectrum"] = end_time-start_time
if (transients):
start_time = time.time()
I, t = self.ISiPM(ns, n_transients=n_transients)
# I, t = self.ISiPM(ns, ntim=6001, n_transients=n_transients)
end_time = time.time()
Trans = {"I": I, "t": t}
self.pars.at[index, "Transients"] = Trans
self.pars.at[index, "TimeElapsed_Transients"] = end_time-start_time
def WriteDataFrame(self, name, directory, ext=None):
if (self.pars.empty):
raise Exception("The simulation data is empty. Please simulate an SiPM before writing a file.")
if (ext is None):
outdir = directory + name + ".pkl"
self.pars.to_pickle(outdir)
else:
outdir = directory + name + ext
if (ext == ".pkl"):
self.pars.to_pickle(outdir)
elif (ext == ".hdf"):
self.pars.to_hdf(outdir)
elif (ext == ".csv"):
self.pars.to_csv(outdir)
elif (ext == ".json"):
self.pars.to_json(outdir)
else:
raise Exception("The file format was not recognized. Please choose an output file extension .pkl, .hdf, .csv or .json ")
def WriteASCII(self, name, directory):
if (self.pars.empty):
raise Exception("The simulation data is empty. Please simulate an SiPM before writing a file.")
variables = self.pars.drop(['ChargeSpectrum'], axis=1)
Qs = self.pars["ChargeSpectrum"]
with open(directory + name + ".head", 'a') as f:
f.write(variables.to_string(header=True, index=False))
with open(directory + name + ".dat", 'a') as f:
for _elem in zip_longest(*Qs):
formatted = (["{:.3f}".format(__elem) if __elem is not None else " " for __elem in _elem])
formatted = ','.join(formatted)
f.write(formatted + '\n')