Select Git revision
ackwork_test.go
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
maxwell_integrate_to_h5.py 11.03 KiB
#!/usr/bin/env python3
import pyFAI
import fabio
import numpy as np
import os
import sys
from time import sleep
from glob import glob
import logging
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL)
import threading
from watchdog.observers.polling import PollingObserver
from watchdog.events import PatternMatchingEventHandler
from multiprocessing.pool import ThreadPool as Pool
import pandas as pd
global NPROC
def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
"""
Azimuthally integrate all images in directory <path> with ending <dtype_im>
to patterns of ending <dtype_int> if not already integrated.
:param 'str' path: path to directory where to apply the azimuthal integration
:param 'str' dtype_im: data type/filename ending of image file
:param 'str' dtype_int: data type/filename ending of pattern file
"""
global NPROC
global FORBIDDEN
fnames_ims = []#= glob(os.path.join(path_im, "*" + dtype_im))
path_int_list = []
for path, subdirs, files in os.walk(path_im):
for name in files:
if not any(forbidden in name for forbidden in FORBIDDEN):
fnames_ims.append(os.path.join(path, name))
if path_im != str(path):
path_new = str(path).replace(path_im,'')
path_new = path_int + path_new
else:
path_new = path_int
path_int_list.append(path_new)
#fnames_ims.sort(key=str.lower)
def integration_thread(fname_im,path_int):
global NPT
global UNIT
global POLARIZATION
global ERRORMODE
if not os.path.isdir(path_int):
os.mkdir(path_int)
im = fabio.open(fname_im).data
basename_int = os.path.basename(fname_im)[:-len(dtype_im)] + dtype_int
fname_int = os.path.join(path_int, basename_int)
if not os.path.isfile(fname_int):
# Perform integration and return results instead of saving to file
if ERRORMODE == "none":
q, I = ai.integrate1d(
data=im,
npt=NPT,
mask=mask,
polarization_factor=POLARIZATION,
correctSolidAngle=True,
error_model=ERRORMODE,
unit=UNIT,
)
dI = np.zeros_like(I)
else:
q, I, dI = ai.integrate1d(
data=im,
npt=NPT,
mask=mask,
polarization_factor=POLARIZATION,
correctSolidAngle=True,
error_model=ERRORMODE,
unit=UNIT,
)
data = {
"q": q,
"I": I,
"dI": dI,
"filename": fname_im
}
return data
pool = Pool(int(NPROC))
for subdir in set(os.path.dirname(fname) for fname in fnames_ims):
subdir_fnames = [fname for fname in fnames_ims if os.path.dirname(fname) == subdir]
subdir_path_int = path_int_list[fnames_ims.index(subdir_fnames[0])]
results = []
filtered_fnames = [fname_im for fname_im in subdir_fnames if "metadata" not in fname_im]
if filtered_fnames:
# Use map_async to apply the integration_thread function to all filtered filenames
async_result = pool.map_async(
lambda fname_im: integration_thread(fname_im, subdir_path_int),
filtered_fnames
)
pool.close()
pool.join()
# Export the DataFrame to a CSV file with the name of the subdirectory
if async_result.ready():
# Retrieve results from async_result
results_data = async_result.get()
results_df = pd.DataFrame(results_data)
results_df = results_df.sort_values(by="filename", key=lambda col: col.str.lower())
subdir_name = os.path.basename(os.path.normpath(subdir_path_int))
results_df.to_csv(os.path.join(subdir_path_int, f"{subdir_name}.csv"), index=False)
results_df.to_hdf(os.path.join(subdir_path_int, f"{subdir_name}.h5"), key='data', mode='w')
print(f"Results for subdirectory {subdir_name} saved to CSV and HDF5 files.")
del results_df
else:
print(f"No images were integrated in subdirectory {subdir}. No results DataFrame created.")
else:
print(f"No valid filenames found in subdirectory {subdir}.")
# Reset the pool for the next subdirectory
pool = Pool(int(NPROC))
def integrate_on_created(event, path_int, dtype_im=".tif", dtype_int=".dat"):
"""
Azimuthally integrate all created images in directory <path>
with ending <dtype_im> to patterns of ending <dtype_int>.
:param 'watchdog.events.FileCreatedEvent' event: watchdog object to check for created files
:param 'str' dtype_im: data type/filename ending of image file
:param 'str' dtype_int: data type/filename ending of pattern file
"""
if not os.path.isdir(path_int):
os.mkdir(path_int)
if event.src_path[-len(dtype_im):] == dtype_im:
im = fabio.open(event.src_path).data
basename_int = os.path.basename(event.src_path)[:-len(dtype_im)] + dtype_int
if ("metadata" not in im):
# Perform integration and return results instead of saving to file
if ERRORMODE == "none":
q, I = ai.integrate1d(
data=im,
npt=NPT,
mask=mask,
polarization_factor=POLARIZATION,
correctSolidAngle=True,
error_model=ERRORMODE,
unit=UNIT,
)
dI = np.zeros_like(I)
else:
q, I, dI = ai.integrate1d(
data=im,
npt=NPT,
mask=mask,
polarization_factor=POLARIZATION,
correctSolidAngle=True,
error_model=ERRORMODE,
unit=UNIT,
)
data = {
"q": q,
"I": I,
"dI": dI,
"filename": im
}
# Check if the DataFrame exists, otherwise create it
if 'results_df' not in globals():
results_df = pd.DataFrame(data)
else:
results_df = pd.concat([results_df, pd.DataFrame(data)], ignore_index=True)
class Handler(PatternMatchingEventHandler):
patterns = ["*tif","*tiff","*TIF","*TIFF"]
ignore_patterns = []
ignore_directories = True
case_sensitive = True
go_recursively = True
def __init__(self, path_im, path_int):
PatternMatchingEventHandler.__init__(self)
self.path_im = path_im
self.path_int = path_int
def on_created(self, event):
#wait that the transfer of the file is finished before processing it
path_event = str(os.path.dirname(event.src_path))
print(path_event)
if self.path_im != path_event:
if self.path_im in path_event:
path_event = path_event.replace(self.path_im,'')
path_int = self.path_int + path_event
else:
path_int = self.path_int
integrate_on_created(event,path_int)
if __name__ == '__main__':
if len(sys.argv) != 11:
print("Usage: python maxwell_integrate_with_subdirs.py <path_im> <path_int> <fname_poni> <fname_mask> <NPROC> <POLARIZATION> <NPT> <UNIT> <ERRORMODE> <DATATYPE>")
sys.exit(1)
if not sys.argv[1].endswith('/'):
sys.argv[1] += '/'
if not sys.argv[2].endswith('/'):
sys.argv[2] += '/'
if not sys.argv[3].endswith('.poni'):
raise ValueError("The poni file must have a .poni extension")
if not sys.argv[4].endswith('.mask') and sys.argv[4] != "None":
raise ValueError("The mask file must have a .mask extension or be 'None' if no mask is used")
if not sys.argv[5].isdigit():
raise ValueError("NPROC must be a positive integer")
if not sys.argv[6].replace('.', '', 1).isdigit() or not (0 <= float(sys.argv[6]) <= 1):
raise ValueError("POLARIZATION must be a float between 0 and 1")
if not sys.argv[7].isdigit():
raise ValueError("NPT must be a positive integer")
if not isinstance(sys.argv[8],str):
raise ValueError("UNIT must be a string representing the unit (e.g., 'q_A^-1', 'q_nm^-1', 'q_ang^-1')")
if not sys.argv[9].isalpha():
raise ValueError("ERRORMODE must be a string representing the error model (e.g., 'poisson', 'azimuthal', 'none')")
if not isinstance(sys.argv[10], str):
raise ValueError("DATATYPE must be a string representing the data type (e.g., 'tif', 'tiff')")
path_im=sys.argv[1]
path_int=sys.argv[2]
fname_poni=sys.argv[3]
fname_mask=sys.argv[4]
NPROC=int(sys.argv[5])
POLARIZATION=float(sys.argv[6])
NPT=int(sys.argv[7])
UNIT=str(sys.argv[8])
ERRORMODE = str(sys.argv[9]).lower()
DATATYPE = str(sys.argv[10]).lower()
FORBIDDEN = sys.argv[11].split(',') if len(sys.argv) > 11 else []
if DATATYPE not in {".tif", ".tiff", ".TIF", ".TIFF"}:
raise ValueError(f"Unsupported data type: {DATATYPE}")
if UNIT not in {"q_A^-1", "q_nm^-1", "q_ang^-1"}:
raise ValueError(f"Unsupported unit: {UNIT}")
if NPROC <= 0:
raise ValueError(f"Number of processes must be a positive integer: {NPROC}")
if POLARIZATION < 0 or POLARIZATION > 1:
raise ValueError(f"Polarization factor must be between 0 and 1: {POLARIZATION}")
if NPT <= 0:
raise ValueError(f"Number of points must be a positive integer: {NPT}")
if not os.path.isdir(path_int):
raise ValueError(f"Path to integrated patterns does not exist: {path_int}")
if not os.path.isdir(path_im):
raise ValueError(f"Path to images does not exist: {path_im}")
if not os.path.isfile(fname_poni):
raise ValueError(f"File with poni parameters does not exist: {fname_poni}")
if not os.path.isfile(fname_mask) and fname_mask != "None":
raise ValueError(f"File with mask does not exist: {fname_mask}")
if ERRORMODE not in {"poisson", "azimuthal", "none"}:
raise ValueError(f"Unsupported error model: {ERRORMODE}")
if not os.path.isdir(path_int):
os.mkdir(path_int)
ai = pyFAI.load(fname_poni)
mask = fabio.open(fname_mask).data if fname_mask else None
integrate_ims_in_dir(path_im, path_int)
# print("Observing directory: " +str(path_im))
# event_handler = Handler(path_im,path_int)
# observer = PollingObserver()
# observer.schedule(event_handler, path_im, recursive=True)
# print("About to start observer")
# observer.start()
# try:
# while True:
# sleep(0.05)
# except KeyboardInterrupt:
# observer.stop()
# observer.join()