Skip to content
Snippets Groups Projects
Commit 91ec1538 authored by Gröne, Tjark Leon Raphael's avatar Gröne, Tjark Leon Raphael
Browse files

Update file maxwell_integrate_to_h5.py

parent b0c88cab
No related branches found
No related tags found
No related merge requests found
......@@ -32,16 +32,22 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
:param 'str' dtype_im: data type/filename ending of image file
:param 'str' dtype_int: data type/filename ending of pattern file
"""
# Get global variables
global NPROC
global FORBIDDEN
seen = []
fnames_ims = []#= glob(os.path.join(path_im, "*" + dtype_im))
fnames_metadata = []#= glob(os.path.join(path_im, "*" + ".metadata"))
# Create emmpty lists to store used filenames and created output paths
fnames_ims = []
fnames_metadata = []
path_int_list = []
# Look for all files in the directory and subdirectories
for path, subdirs, files in os.walk(path_im):
# Here we seperate files with and without metadata ending --> TIFs will be integrated, metadata files will be used to extract metadata
for name in files:
if not any(forbidden in name for forbidden in FORBIDDEN):
fnames_ims.append(os.path.join(path, name))
# Create the output path for the integrated patterns
if path_im != str(path):
path_new = str(path).replace(path_im,'')
path_new = path_int + path_new
......@@ -60,9 +66,12 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
:param fnames_metadata: List of filenames with .metadata extension.
:return: Dictonary containing metadata.
"""
# For each name entered, we extract the metadata from the file
metadata = {}
#Open the metadata file and read its contents
with open(os.path.join(path, name), 'r') as metadata_file:
# We scan line for line for keywords and extract the values
for line in metadata_file:
if line.startswith("dateString="):
metadata["dateString"] = line.split("=", 1)[1].strip()
......@@ -85,20 +94,25 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
elif line.startswith("imageSequenceNumber="):
metadata["imageSequenceNumber"] = line.split("=", 1)[1].strip()
metadata["filename"] = name
# Convert metadata dictionary to a DataFrame and sort by filename
return metadata
def integration_thread(fname_im,path_int):
def integration_thread(fname_im):
"""
Integrate a single image file and return the results.
:param fname_im: Filename of the image to integrate.
:param path_int: Path to save the integrated pattern.
:return: Dictionary containing q, I, dI, and filename.
"""
# Get global variables
global NPT
global UNIT
global POLARIZATION
global ERRORMODE
# Open the image file with fabio
im = fabio.open(fname_im).data
basename_int = os.path.basename(fname_im)[:-len(dtype_im)] + dtype_int
fname_int = os.path.join(path_int, basename_int)
if not os.path.isfile(fname_int):
# Perform integration and return results instead of saving to file
if ERRORMODE == "none":
q, I = ai.integrate1d(
......@@ -122,6 +136,7 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
unit=UNIT,
)
# Create the data dictionary to return
data = {
"q": q,
"I": I,
......@@ -132,27 +147,39 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
return data
pool = Pool(int(NPROC))
# Loop through all subdirectories and integrate images
for subdir in set(os.path.dirname(fname) for fname in fnames_ims):
# Get filenames and metadata for the current subdirectory
subdir_fnames = [fname for fname in fnames_ims if os.path.dirname(fname) == subdir]
subdir_fnames_metadata = [fname_meta for fname_meta in fnames_metadata if os.path.dirname(fname_meta) == subdir]
# Checl if there are any images in the subdirectory
if not subdir_fnames:
print(f"No images found in subdirectory: {subdir}")
continue
# Get the first filename in the subdirectory to create the output path
subdir_path_int = path_int_list[fnames_ims.index(subdir_fnames[0])]
# Create the output directory if it does not exist
if not os.path.isdir(subdir_path_int):
os.mkdir(subdir_path_int)
# We filter out metadata files and images to process them separately (redundend check)
filtered_fnames = [fname_im for fname_im in subdir_fnames if "metadata" not in fname_im]
filetered_metadata = [fname_im for fname_im in subdir_fnames_metadata if "metadata" in fname_im]
# We use async processing to integrate images and extract metadata, this reduces processing time on server
if filtered_fnames:
# Here we create a new pool for each subdirectory to avoid issues with shared state
pool = Pool(int(NPROC))
# Use map_async to apply the integration_thread function to all filtered filenames
print(f"Integrating images in subdirectory: {subdir}")
# Use a lambda function to pass the subdir_path_int to the integration_thread
# Map async allows us to run the integration in parallel, with trace back of varibles
async_result = pool.map_async(
lambda fname_im: integration_thread(fname_im, subdir_path_int),
filtered_fnames
......@@ -160,75 +187,104 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
if filetered_metadata:
print(f"Extracting metadata in subdirectory: {subdir}")
# Metadata extraction can run in the same pool, but it should be executed after async_result
pool.close()
pool.join() # Ensure image integration tasks are completed before starting metadata extraction
# Create a new pool for metadata extraction
metadata_pool = Pool(int(NPROC))
# Use map_async to apply the metadata_thread function to all filtered metadata filenames
# Use a lambda function to pass the subdir_path_int to the metadata_thread
# Map async allows us to run the metadata extraction in parallel, with trace back of varibles
async_metadata_result = metadata_pool.map_async(
lambda fname_meta: metadata_thread(fname_meta),
filetered_metadata
)
# Wait for the metadata extraction to complete
metadata_pool.close()
metadata_pool.join()
else:
# If no metadata files are found, we create an empty DataFrame
pool.close()
pool.join()
# Export the DataFrame to a CSV file with the name of the subdirectory
# Wait for the async results to be ready (all threads to finish)
if async_result.ready() and async_metadata_result.ready():
# Retrieve results from async_result
results_data = async_result.get()
results_metadata = async_metadata_result.get()
# Convert results to DataFrames
results_df = pd.DataFrame(results_data)
results_metadata_df = pd.DataFrame(results_metadata)
# We sort after the filename to ensure consistent order
results_df = results_df.sort_values(by="filename", key=lambda col: col.str.lower())
results_metadata_df = results_metadata_df.sort_values(by="filename", key=lambda col: col.str.lower())
# Check if the metadata DataFrame has the same columns as the results DataFrame
for key in results_metadata_df.columns:
if key not in results_df.columns:
# Combine metadata into results DataFrame (if the key is not already present, e.g., filename used for sorting)
results_df[key] = results_metadata_df[key].values
# We export the results DataFrame to a CSV file in the subdirectory
subdir_name = os.path.basename(os.path.normpath(subdir_path_int))
results_df.to_csv(os.path.join(subdir_path_int, f"{subdir_name}.csv"), index=False)
# Sort results_data and results_metadata by filename using a natural sort key (as for the dataframe before)
# It is posible to use pandas to export the data to HDF5, but we want to use h5py for more control (pyMCA compatibility)
def natural_sort_key(item):
return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', item["filename"])]
# Sort results_data and results_metadata by filename
results_data = sorted(results_data, key=natural_sort_key)
results_metadata = sorted(results_metadata, key=natural_sort_key)
# Create the output HDF5 file in the subdirectory
# Check if the output file already exists, if so, remove it
output_file = os.path.join(subdir_path_int, f"{subdir_name}.h5")
if os.path.exists(output_file):
print(f"File {output_file} already exists. Removing it to create a new one.")
os.remove(output_file)
# Create the HDF5 file with the results
with h5py.File(output_file, "w", libver="latest", track_order=True) as h5:
# Create the root group and set its attributes
h5.attrs["NX_class"] = "NXroot"
# Create a group for each scan
for idx, result in enumerate(results_data, start=1):
# Drop unfinished scans (usually last scan due to closed shutter)
if not result["q"].size or not result["I"].size:
print(f"Skipping invalid scan data for entry {entry_name}")
continue
# Here one could use the image sequence number from the metadata, however, we use the index as it seemes cleaner
entry_name = f"{idx:05d}.1"
# Create a new entry for each scan
entry = h5.create_group(entry_name)
entry["title"] = "Collected Q-I scans"
entry.attrs["NX_class"] = "NXentry"
# entry.create_dataset("time", data=results_metadata[idx-1]["dateString"].encode('utf-8'), dtype=h5py.string_dtype(encoding='utf-8'))
entry.attrs["time"] = results_metadata[idx-1]["dateString"].encode('utf-8')
if image_sequence_number.isdigit():
entry.attrs["image sequence number"] = int(image_sequence_number)
# Set time attributes for the entry
entry.create_dataset("time", data=results_metadata[idx-1]["dateString"].encode('utf-8'), dtype=h5py.string_dtype(encoding='utf-8'))
# We log the image sequence number if it is available in the metadata
image_sequence_number = results_metadata[idx-1].get("imageSequenceNumber", "").strip()
if image_sequence_number.isdigit():
entry.create_dataset("image sequence number", data=np.asarray([int(image_sequence_number)], dtype=np.int32), dtype="i4", compression_opts=4, compression="gzip")
# User comments can be added to the entry if available
# We check if any of the user comments are available, if so, we create a comments group
if any(results_metadata[idx-1][key] for key in ["userComment1", "userComment2", "userComment3", "userComment4"]):
comments = entry.create_group("comments")
comments.attrs["NX_class"] = "NXcomments"
......@@ -242,12 +298,18 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
comments.create_dataset("userComment4", data=results_metadata[idx-1]["userComment4"].encode('utf-8'), dtype=h5py.string_dtype(encoding='utf-8'),compression_opts=4, compression="gzip")
# Instrument / Detector group
# Instrument / Detector group (holds all detector data)
detector = entry.create_group("instrument/detector")
detector.attrs["NX_class"] = "NXdetector"
# Compress the data to save space, chunks are used to allow for efficient reading
# Larger chunk sizes increase compression but may slow down reading
# 256 is a common chunk size, (512 is also a good choice for larger datasets), over 1024 may lead to memory issues
chunk_size = 512
detector.create_dataset("q [1/Å]", data=np.asarray(result["q"], dtype=np.float64), chunks=(chunk_size,), dtype="f8", compression_opts=4, compression="gzip")
# Create datasets for q, I, and dI with compression
# We use np.asarray to ensure the data is in the correct format
# and dtype is set to float64 for better precision
detector.create_dataset("q [Å^-1]", data=np.asarray(result["q"], dtype=np.float64), chunks=(chunk_size,), dtype="f8", compression_opts=4, compression="gzip")
detector.create_dataset("I", data=np.asarray(result["I"], dtype=np.float64), chunks=(chunk_size,), dtype="f8", compression_opts=4, compression="gzip")
detector.create_dataset("dI", data=np.asarray(result["dI"], dtype=np.float64), chunks=(chunk_size,), dtype="f8", compression_opts=4, compression="gzip")
......@@ -256,14 +318,16 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
height = results_metadata[idx-1].get("height", "").strip()
exposure_time = results_metadata[idx-1].get("exposureTime", "").strip()
summed_exposures = results_metadata[idx-1].get("summedExposures", "").strip()
image_sequence_number = results_metadata[idx-1].get("imageSequenceNumber", "").strip()
# Create detector size dataset if width and height are valid integers
# We check if the width and height are digits (i.e., valid integers)
if width.isdigit() and height.isdigit():
det_size = detector.create_group("detector size")
det_size.attrs["NX_class"] = "NXcollection"
det_size.create_dataset("detector width [pixel]", data=np.asarray([int(width)], dtype=np.int32), dtype="i4", compression_opts=4, compression="gzip")
det_size.create_dataset("detector height [pixel]", data=np.asarray([int(height)], dtype=np.int32), dtype="i4", compression_opts=4, compression="gzip")
# Also we trac exposure time and summed exposures if they are valid
if exposure_time.isdigit():
detector.create_dataset("exposure time [s]", data=np.asarray([float(exposure_time)], dtype=np.float32), dtype="f4", compression_opts=4, compression="gzip")
if summed_exposures.replace('.', '', 1).isdigit():
......@@ -279,27 +343,31 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
meas.attrs["axes"] = "q"
meas.attrs["filename"] = result["filename"]
# Create soft links to the detector datasets
# We use soft links to the detector datasets to allow for easy access
# This is useful for PyMca and other tools that expect these links
meas["I"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/I")
meas["q [1/Å]"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/q [1/Å]")
meas["q [Å^-1]"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/q [Å^-1]")
meas["dI"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/dI")
# Optional display-friendly names
meas["I"].attrs["long_name"] = "Intensity"
meas["q [1/Å]"].attrs["long_name"] = "Q [1/Å]"
meas["q [Å^-1]"].attrs["long_name"] = "Q [1/Å]"
# Measurement group (holds soft links)
# We create a plotselect group to allow for easy plotting in h5Web or PyMca
# This group will contain soft links to the datasets in the measurement group
plotselect = entry.create_group("plotselect")
plotselect.attrs["NX_class"] = "NXdata"
plotselect.attrs["signal"] = "I"
plotselect.attrs["axes"] = "q"
plotselect["I"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/I")
plotselect["q [1/Å]"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/q [1/Å]")
plotselect["q [Å^-1]"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/q [Å^-1]")
# Optional display-friendly names
plotselect["I"].attrs["long_name"] = "Intensity"
plotselect["q [1/Å]"].attrs["long_name"] = "Q [1/A]"
plotselect["q [Å^-1]"].attrs["long_name"] = "Q [1/Å]"
# For PyMca auto-plot:
entry.attrs["default"] = "plotselect"
......@@ -311,7 +379,7 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
print(f"✅ HDF5 file '{output_file}' created with {len(results_data)} spectra.")
# Clean the results DataFrame from memory (redundend, but good practice)
del results_df
else:
print(f"No images were integrated in subdirectory {subdir}. No results DataFrame created.")
......@@ -330,7 +398,7 @@ def integrate_on_created(event, path_int, dtype_im=".tif", dtype_int=".dat"):
:param 'str' dtype_im: data type/filename ending of image file
:param 'str' dtype_int: data type/filename ending of pattern file
"""
# Need modification still for csv and h5 output
if not os.path.isdir(path_int):
os.mkdir(path_int)
......@@ -377,31 +445,50 @@ def integrate_on_created(event, path_int, dtype_im=".tif", dtype_int=".dat"):
class Handler(PatternMatchingEventHandler):
"""
Handler for file creation events in a directory.
This class extends PatternMatchingEventHandler to monitor specific file patterns
and trigger integration when new files are created.
It is used to integrate images in a directory and its subdirectories.
"""
# We define the patterns to match for file creation events
# The patterns are defined as a list of strings, where each string is a glob pattern
# The patterns are case-sensitive and include both lower and upper case extensions
patterns = ["*tif","*tiff","*TIF","*TIFF"]
ignore_patterns = []
ignore_directories = True
case_sensitive = True
go_recursively = True
# We define the constructor to initialize the handler with the paths
def __init__(self, path_im, path_int):
PatternMatchingEventHandler.__init__(self)
self.path_im = path_im
self.path_int = path_int
# We define the on_created method to handle file creation events
def on_created(self, event):
#wait that the transfer of the file is finished before processing it
path_event = str(os.path.dirname(event.src_path))
print(path_event)
# Create the path for the integrated pattern
if self.path_im != path_event:
if self.path_im in path_event:
path_event = path_event.replace(self.path_im,'')
path_int = self.path_int + path_event
else:
path_int = self.path_int
# Integrate the image using the integrate_on_created function
integrate_on_created(event,path_int)
if __name__ == '__main__':
# Check if the correct number of arguments is provideds
# System arguments are expected in the following order:
# <path_im> <path_int> <fname_poni> <fname_mask> <NPROC> <POLARIZATION> <NPT> <UNIT> <ERRORMODE> <DATATYPE> <FORBIDDEN>
# System arguments are expected to be provided in the command line / .sh file
if len(sys.argv) != 12:
print("Usage: python maxwell_integrate_with_subdirs.py <path_im> <path_int> <fname_poni> <fname_mask> <NPROC> <POLARIZATION> <NPT> <UNIT> <ERRORMODE> <DATATYPE> <FORBIDDEN>")
sys.exit(1)
......@@ -420,7 +507,6 @@ if __name__ == '__main__':
if not sys.argv[7].isdigit():
raise ValueError("NPT must be a positive integer")
if not isinstance(sys.argv[8],str):
raise ValueError("UNIT must be a string representing the unit (e.g., 'q_A^-1', 'q_nm^-1', 'q_ang^-1')")
if not sys.argv[9].isalpha():
raise ValueError("ERRORMODE must be a string representing the error model (e.g., 'poisson', 'azimuthal', 'none')")
......@@ -428,18 +514,20 @@ if __name__ == '__main__':
raise ValueError("DATATYPE must be a string representing the data type (e.g., 'tif', 'tiff')")
path_im=sys.argv[1]
path_int=sys.argv[2]
fname_poni=sys.argv[3]
fname_mask=sys.argv[4]
NPROC=int(sys.argv[5])
POLARIZATION=float(sys.argv[6])
NPT=int(sys.argv[7])
UNIT=str(sys.argv[8])
ERRORMODE = str(sys.argv[9]).lower()
DATATYPE = str(sys.argv[10]).lower()
FORBIDDEN = sys.argv[11].split(',') if len(sys.argv) > 11 else []
# Parse the command line arguments
path_im=sys.argv[1] # Path to images, e.g. "/data/images/"
path_int=sys.argv[2] # Path to integrated patterns, e.g. "/data/integrated_patterns/"
fname_poni=sys.argv[3] # File with poni parameters, e.g. "/data/poni/poni_file.poni"
fname_mask=sys.argv[4] # File with mask, e.g. "/data/mask/mask.edf" or "None" if no mask is used
NPROC=int(sys.argv[5]) # Number of processes to use for integration
POLARIZATION=float(sys.argv[6]) # Polarization factor, e.g. 0.9
NPT=int(sys.argv[7]) # Number of points for integration
UNIT=str(sys.argv[8]) # Unit for q, e.g. "q_A^-1", "q_nm^-1", "q_ang^-1"
ERRORMODE = str(sys.argv[9]).lower() # Error model, e.g. "poisson", "azimuthal", "none"
DATATYPE = str(sys.argv[10]).lower() # Data type of the images, e.g. ".tif", ".tiff"
FORBIDDEN = sys.argv[11].split(',') if len(sys.argv) > 11 else [] # Forbidden substrings in filenames, e.g. "metadata,thumbs.db"
# Check if the provided arguments are valid
if DATATYPE not in {".tif", ".tiff", ".TIF", ".TIFF"}:
raise ValueError(f"Unsupported data type: {DATATYPE}")
if UNIT not in {"q_A^-1", "q_nm^-1", "q_ang^-1"}:
......@@ -461,13 +549,15 @@ if __name__ == '__main__':
if ERRORMODE not in {"poisson", "azimuthal", "none"}:
raise ValueError(f"Unsupported error model: {ERRORMODE}")
# Create the output directory if it does not exist
if not os.path.isdir(path_int):
os.mkdir(path_int)
# We open the poni file and the mask file (if provided)
ai = pyFAI.load(fname_poni)
mask = fabio.open(fname_mask).data if fname_mask else None
# Integrate images in the directory
integrate_ims_in_dir(path_im, path_int)
# print("Observing directory: " +str(path_im))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment