From 91ec1538132dc1690a94d11df46c717ff8997c6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=B6ne=2C=20Tjark=20Leon=20Raphael?= <tjark.leon.raphael.groene@uni-hamburg.de> Date: Wed, 18 Jun 2025 15:39:51 +0200 Subject: [PATCH] Update file maxwell_integrate_to_h5.py --- maxwell_integrate_to_h5.py | 252 +++++++++++++++++++++++++------------ 1 file changed, 171 insertions(+), 81 deletions(-) diff --git a/maxwell_integrate_to_h5.py b/maxwell_integrate_to_h5.py index 11a030f..0897401 100644 --- a/maxwell_integrate_to_h5.py +++ b/maxwell_integrate_to_h5.py @@ -32,16 +32,22 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"): :param 'str' dtype_im: data type/filename ending of image file :param 'str' dtype_int: data type/filename ending of pattern file """ + # Get global variables global NPROC global FORBIDDEN - seen = [] - fnames_ims = []#= glob(os.path.join(path_im, "*" + dtype_im)) - fnames_metadata = []#= glob(os.path.join(path_im, "*" + ".metadata")) + + # Create emmpty lists to store used filenames and created output paths + fnames_ims = [] + fnames_metadata = [] path_int_list = [] + + # Look for all files in the directory and subdirectories for path, subdirs, files in os.walk(path_im): + # Here we seperate files with and without metadata ending --> TIFs will be integrated, metadata files will be used to extract metadata for name in files: if not any(forbidden in name for forbidden in FORBIDDEN): fnames_ims.append(os.path.join(path, name)) + # Create the output path for the integrated patterns if path_im != str(path): path_new = str(path).replace(path_im,'') path_new = path_int + path_new @@ -50,7 +56,7 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"): path_int_list.append(path_new) if "metadata" in name: fnames_metadata.append(os.path.join(path, name)) - + #fnames_ims.sort(key=str.lower) @@ -60,9 +66,12 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"): :param fnames_metadata: List of filenames with .metadata extension. :return: Dictonary containing metadata. """ + # For each name entered, we extract the metadata from the file metadata = {} + #Open the metadata file and read its contents with open(os.path.join(path, name), 'r') as metadata_file: + # We scan line for line for keywords and extract the values for line in metadata_file: if line.startswith("dateString="): metadata["dateString"] = line.split("=", 1)[1].strip() @@ -85,74 +94,92 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"): elif line.startswith("imageSequenceNumber="): metadata["imageSequenceNumber"] = line.split("=", 1)[1].strip() metadata["filename"] = name - # Convert metadata dictionary to a DataFrame and sort by filename + return metadata - def integration_thread(fname_im,path_int): + def integration_thread(fname_im): + """ + Integrate a single image file and return the results. + :param fname_im: Filename of the image to integrate. + :param path_int: Path to save the integrated pattern. + :return: Dictionary containing q, I, dI, and filename. + """ + # Get global variables global NPT global UNIT global POLARIZATION global ERRORMODE + # Open the image file with fabio im = fabio.open(fname_im).data - basename_int = os.path.basename(fname_im)[:-len(dtype_im)] + dtype_int - fname_int = os.path.join(path_int, basename_int) - - if not os.path.isfile(fname_int): - # Perform integration and return results instead of saving to file - if ERRORMODE == "none": - q, I = ai.integrate1d( - data=im, - npt=NPT, - mask=mask, - polarization_factor=POLARIZATION, - correctSolidAngle=True, - error_model=ERRORMODE, - unit=UNIT, - ) - dI = np.zeros_like(I) - else: - q, I, dI = ai.integrate1d( - data=im, - npt=NPT, - mask=mask, - polarization_factor=POLARIZATION, - correctSolidAngle=True, - error_model=ERRORMODE, - unit=UNIT, - ) + + # Perform integration and return results instead of saving to file + if ERRORMODE == "none": + q, I = ai.integrate1d( + data=im, + npt=NPT, + mask=mask, + polarization_factor=POLARIZATION, + correctSolidAngle=True, + error_model=ERRORMODE, + unit=UNIT, + ) + dI = np.zeros_like(I) + else: + q, I, dI = ai.integrate1d( + data=im, + npt=NPT, + mask=mask, + polarization_factor=POLARIZATION, + correctSolidAngle=True, + error_model=ERRORMODE, + unit=UNIT, + ) - data = { - "q": q, - "I": I, - "dI": dI, - "filename": fname_im - } - - return data + # Create the data dictionary to return + data = { + "q": q, + "I": I, + "dI": dI, + "filename": fname_im + } + + return data - pool = Pool(int(NPROC)) + # Loop through all subdirectories and integrate images for subdir in set(os.path.dirname(fname) for fname in fnames_ims): - + # Get filenames and metadata for the current subdirectory subdir_fnames = [fname for fname in fnames_ims if os.path.dirname(fname) == subdir] subdir_fnames_metadata = [fname_meta for fname_meta in fnames_metadata if os.path.dirname(fname_meta) == subdir] + # Checl if there are any images in the subdirectory if not subdir_fnames: print(f"No images found in subdirectory: {subdir}") continue + + # Get the first filename in the subdirectory to create the output path subdir_path_int = path_int_list[fnames_ims.index(subdir_fnames[0])] + # Create the output directory if it does not exist if not os.path.isdir(subdir_path_int): os.mkdir(subdir_path_int) + # We filter out metadata files and images to process them separately (redundend check) filtered_fnames = [fname_im for fname_im in subdir_fnames if "metadata" not in fname_im] filetered_metadata = [fname_im for fname_im in subdir_fnames_metadata if "metadata" in fname_im] + # We use async processing to integrate images and extract metadata, this reduces processing time on server if filtered_fnames: + # Here we create a new pool for each subdirectory to avoid issues with shared state + pool = Pool(int(NPROC)) + # Use map_async to apply the integration_thread function to all filtered filenames print(f"Integrating images in subdirectory: {subdir}") + + # Use a lambda function to pass the subdir_path_int to the integration_thread + # Map async allows us to run the integration in parallel, with trace back of varibles async_result = pool.map_async( lambda fname_im: integration_thread(fname_im, subdir_path_int), filtered_fnames @@ -160,75 +187,104 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"): if filetered_metadata: print(f"Extracting metadata in subdirectory: {subdir}") + # Metadata extraction can run in the same pool, but it should be executed after async_result pool.close() pool.join() # Ensure image integration tasks are completed before starting metadata extraction # Create a new pool for metadata extraction metadata_pool = Pool(int(NPROC)) + + # Use map_async to apply the metadata_thread function to all filtered metadata filenames + # Use a lambda function to pass the subdir_path_int to the metadata_thread + # Map async allows us to run the metadata extraction in parallel, with trace back of varibles async_metadata_result = metadata_pool.map_async( lambda fname_meta: metadata_thread(fname_meta), filetered_metadata ) + + # Wait for the metadata extraction to complete metadata_pool.close() metadata_pool.join() else: + # If no metadata files are found, we create an empty DataFrame pool.close() pool.join() - # Export the DataFrame to a CSV file with the name of the subdirectory + # Wait for the async results to be ready (all threads to finish) if async_result.ready() and async_metadata_result.ready(): + # Retrieve results from async_result results_data = async_result.get() results_metadata = async_metadata_result.get() + # Convert results to DataFrames results_df = pd.DataFrame(results_data) results_metadata_df = pd.DataFrame(results_metadata) + # We sort after the filename to ensure consistent order results_df = results_df.sort_values(by="filename", key=lambda col: col.str.lower()) results_metadata_df = results_metadata_df.sort_values(by="filename", key=lambda col: col.str.lower()) + # Check if the metadata DataFrame has the same columns as the results DataFrame for key in results_metadata_df.columns: if key not in results_df.columns: + # Combine metadata into results DataFrame (if the key is not already present, e.g., filename used for sorting) results_df[key] = results_metadata_df[key].values - + + # We export the results DataFrame to a CSV file in the subdirectory subdir_name = os.path.basename(os.path.normpath(subdir_path_int)) results_df.to_csv(os.path.join(subdir_path_int, f"{subdir_name}.csv"), index=False) + # Sort results_data and results_metadata by filename using a natural sort key (as for the dataframe before) + # It is posible to use pandas to export the data to HDF5, but we want to use h5py for more control (pyMCA compatibility) def natural_sort_key(item): return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', item["filename"])] + # Sort results_data and results_metadata by filename results_data = sorted(results_data, key=natural_sort_key) results_metadata = sorted(results_metadata, key=natural_sort_key) + # Create the output HDF5 file in the subdirectory + # Check if the output file already exists, if so, remove it output_file = os.path.join(subdir_path_int, f"{subdir_name}.h5") if os.path.exists(output_file): - print(f"File {output_file} already exists. Removing it to create a new one.") os.remove(output_file) - + # Create the HDF5 file with the results with h5py.File(output_file, "w", libver="latest", track_order=True) as h5: + # Create the root group and set its attributes h5.attrs["NX_class"] = "NXroot" + # Create a group for each scan for idx, result in enumerate(results_data, start=1): - + + # Drop unfinished scans (usually last scan due to closed shutter) if not result["q"].size or not result["I"].size: print(f"Skipping invalid scan data for entry {entry_name}") continue - + + # Here one could use the image sequence number from the metadata, however, we use the index as it seemes cleaner entry_name = f"{idx:05d}.1" + + # Create a new entry for each scan entry = h5.create_group(entry_name) entry["title"] = "Collected Q-I scans" entry.attrs["NX_class"] = "NXentry" - # entry.create_dataset("time", data=results_metadata[idx-1]["dateString"].encode('utf-8'), dtype=h5py.string_dtype(encoding='utf-8')) - entry.attrs["time"] = results_metadata[idx-1]["dateString"].encode('utf-8') + + # Set time attributes for the entry + entry.create_dataset("time", data=results_metadata[idx-1]["dateString"].encode('utf-8'), dtype=h5py.string_dtype(encoding='utf-8')) + + # We log the image sequence number if it is available in the metadata + image_sequence_number = results_metadata[idx-1].get("imageSequenceNumber", "").strip() if image_sequence_number.isdigit(): - entry.attrs["image sequence number"] = int(image_sequence_number) - + entry.create_dataset("image sequence number", data=np.asarray([int(image_sequence_number)], dtype=np.int32), dtype="i4", compression_opts=4, compression="gzip") + # User comments can be added to the entry if available + # We check if any of the user comments are available, if so, we create a comments group if any(results_metadata[idx-1][key] for key in ["userComment1", "userComment2", "userComment3", "userComment4"]): comments = entry.create_group("comments") comments.attrs["NX_class"] = "NXcomments" @@ -242,12 +298,18 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"): comments.create_dataset("userComment4", data=results_metadata[idx-1]["userComment4"].encode('utf-8'), dtype=h5py.string_dtype(encoding='utf-8'),compression_opts=4, compression="gzip") - # Instrument / Detector group + # Instrument / Detector group (holds all detector data) detector = entry.create_group("instrument/detector") detector.attrs["NX_class"] = "NXdetector" + # Compress the data to save space, chunks are used to allow for efficient reading + # Larger chunk sizes increase compression but may slow down reading + # 256 is a common chunk size, (512 is also a good choice for larger datasets), over 1024 may lead to memory issues chunk_size = 512 - detector.create_dataset("q [1/Å]", data=np.asarray(result["q"], dtype=np.float64), chunks=(chunk_size,), dtype="f8", compression_opts=4, compression="gzip") + # Create datasets for q, I, and dI with compression + # We use np.asarray to ensure the data is in the correct format + # and dtype is set to float64 for better precision + detector.create_dataset("q [Å^-1]", data=np.asarray(result["q"], dtype=np.float64), chunks=(chunk_size,), dtype="f8", compression_opts=4, compression="gzip") detector.create_dataset("I", data=np.asarray(result["I"], dtype=np.float64), chunks=(chunk_size,), dtype="f8", compression_opts=4, compression="gzip") detector.create_dataset("dI", data=np.asarray(result["dI"], dtype=np.float64), chunks=(chunk_size,), dtype="f8", compression_opts=4, compression="gzip") @@ -256,14 +318,16 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"): height = results_metadata[idx-1].get("height", "").strip() exposure_time = results_metadata[idx-1].get("exposureTime", "").strip() summed_exposures = results_metadata[idx-1].get("summedExposures", "").strip() - image_sequence_number = results_metadata[idx-1].get("imageSequenceNumber", "").strip() - - + + # Create detector size dataset if width and height are valid integers + # We check if the width and height are digits (i.e., valid integers) if width.isdigit() and height.isdigit(): det_size = detector.create_group("detector size") det_size.attrs["NX_class"] = "NXcollection" det_size.create_dataset("detector width [pixel]", data=np.asarray([int(width)], dtype=np.int32), dtype="i4", compression_opts=4, compression="gzip") det_size.create_dataset("detector height [pixel]", data=np.asarray([int(height)], dtype=np.int32), dtype="i4", compression_opts=4, compression="gzip") + + # Also we trac exposure time and summed exposures if they are valid if exposure_time.isdigit(): detector.create_dataset("exposure time [s]", data=np.asarray([float(exposure_time)], dtype=np.float32), dtype="f4", compression_opts=4, compression="gzip") if summed_exposures.replace('.', '', 1).isdigit(): @@ -279,27 +343,31 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"): meas.attrs["axes"] = "q" meas.attrs["filename"] = result["filename"] - + # Create soft links to the detector datasets + # We use soft links to the detector datasets to allow for easy access + # This is useful for PyMca and other tools that expect these links meas["I"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/I") - meas["q [1/Å]"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/q [1/Å]") + meas["q [Å^-1]"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/q [Å^-1]") meas["dI"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/dI") # Optional display-friendly names meas["I"].attrs["long_name"] = "Intensity" - meas["q [1/Å]"].attrs["long_name"] = "Q [1/Å]" + meas["q [Å^-1]"].attrs["long_name"] = "Q [1/Å]" - # Measurement group (holds soft links) + # Measurement group (holds soft links) + # We create a plotselect group to allow for easy plotting in h5Web or PyMca + # This group will contain soft links to the datasets in the measurement group plotselect = entry.create_group("plotselect") plotselect.attrs["NX_class"] = "NXdata" plotselect.attrs["signal"] = "I" plotselect.attrs["axes"] = "q" plotselect["I"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/I") - plotselect["q [1/Å]"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/q [1/Å]") + plotselect["q [Å^-1]"] = h5py.SoftLink(f"/{entry_name}/instrument/detector/q [Å^-1]") # Optional display-friendly names plotselect["I"].attrs["long_name"] = "Intensity" - plotselect["q [1/Å]"].attrs["long_name"] = "Q [1/A]" + plotselect["q [Å^-1]"].attrs["long_name"] = "Q [1/Å]" # For PyMca auto-plot: entry.attrs["default"] = "plotselect" @@ -311,7 +379,7 @@ def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"): print(f"✅ HDF5 file '{output_file}' created with {len(results_data)} spectra.") - + # Clean the results DataFrame from memory (redundend, but good practice) del results_df else: print(f"No images were integrated in subdirectory {subdir}. No results DataFrame created.") @@ -330,7 +398,7 @@ def integrate_on_created(event, path_int, dtype_im=".tif", dtype_int=".dat"): :param 'str' dtype_im: data type/filename ending of image file :param 'str' dtype_int: data type/filename ending of pattern file """ - + # Need modification still for csv and h5 output if not os.path.isdir(path_int): os.mkdir(path_int) @@ -377,31 +445,50 @@ def integrate_on_created(event, path_int, dtype_im=".tif", dtype_int=".dat"): class Handler(PatternMatchingEventHandler): + """ + Handler for file creation events in a directory. + This class extends PatternMatchingEventHandler to monitor specific file patterns + and trigger integration when new files are created. + It is used to integrate images in a directory and its subdirectories. + """ + # We define the patterns to match for file creation events + # The patterns are defined as a list of strings, where each string is a glob pattern + # The patterns are case-sensitive and include both lower and upper case extensions patterns = ["*tif","*tiff","*TIF","*TIFF"] ignore_patterns = [] ignore_directories = True case_sensitive = True go_recursively = True + # We define the constructor to initialize the handler with the paths def __init__(self, path_im, path_int): PatternMatchingEventHandler.__init__(self) self.path_im = path_im self.path_int = path_int + # We define the on_created method to handle file creation events def on_created(self, event): #wait that the transfer of the file is finished before processing it path_event = str(os.path.dirname(event.src_path)) print(path_event) + + # Create the path for the integrated pattern if self.path_im != path_event: if self.path_im in path_event: path_event = path_event.replace(self.path_im,'') path_int = self.path_int + path_event else: path_int = self.path_int + + # Integrate the image using the integrate_on_created function integrate_on_created(event,path_int) if __name__ == '__main__': + # Check if the correct number of arguments is provideds + # System arguments are expected in the following order: + # <path_im> <path_int> <fname_poni> <fname_mask> <NPROC> <POLARIZATION> <NPT> <UNIT> <ERRORMODE> <DATATYPE> <FORBIDDEN> + # System arguments are expected to be provided in the command line / .sh file if len(sys.argv) != 12: print("Usage: python maxwell_integrate_with_subdirs.py <path_im> <path_int> <fname_poni> <fname_mask> <NPROC> <POLARIZATION> <NPT> <UNIT> <ERRORMODE> <DATATYPE> <FORBIDDEN>") sys.exit(1) @@ -419,27 +506,28 @@ if __name__ == '__main__': raise ValueError("POLARIZATION must be a float between 0 and 1") if not sys.argv[7].isdigit(): raise ValueError("NPT must be a positive integer") - if not isinstance(sys.argv[8],str): - + if not isinstance(sys.argv[8],str): raise ValueError("UNIT must be a string representing the unit (e.g., 'q_A^-1', 'q_nm^-1', 'q_ang^-1')") if not sys.argv[9].isalpha(): raise ValueError("ERRORMODE must be a string representing the error model (e.g., 'poisson', 'azimuthal', 'none')") if not isinstance(sys.argv[10], str): raise ValueError("DATATYPE must be a string representing the data type (e.g., 'tif', 'tiff')") - - path_im=sys.argv[1] - path_int=sys.argv[2] - fname_poni=sys.argv[3] - fname_mask=sys.argv[4] - NPROC=int(sys.argv[5]) - POLARIZATION=float(sys.argv[6]) - NPT=int(sys.argv[7]) - UNIT=str(sys.argv[8]) - ERRORMODE = str(sys.argv[9]).lower() - DATATYPE = str(sys.argv[10]).lower() - FORBIDDEN = sys.argv[11].split(',') if len(sys.argv) > 11 else [] - + + # Parse the command line arguments + path_im=sys.argv[1] # Path to images, e.g. "/data/images/" + path_int=sys.argv[2] # Path to integrated patterns, e.g. "/data/integrated_patterns/" + fname_poni=sys.argv[3] # File with poni parameters, e.g. "/data/poni/poni_file.poni" + fname_mask=sys.argv[4] # File with mask, e.g. "/data/mask/mask.edf" or "None" if no mask is used + NPROC=int(sys.argv[5]) # Number of processes to use for integration + POLARIZATION=float(sys.argv[6]) # Polarization factor, e.g. 0.9 + NPT=int(sys.argv[7]) # Number of points for integration + UNIT=str(sys.argv[8]) # Unit for q, e.g. "q_A^-1", "q_nm^-1", "q_ang^-1" + ERRORMODE = str(sys.argv[9]).lower() # Error model, e.g. "poisson", "azimuthal", "none" + DATATYPE = str(sys.argv[10]).lower() # Data type of the images, e.g. ".tif", ".tiff" + FORBIDDEN = sys.argv[11].split(',') if len(sys.argv) > 11 else [] # Forbidden substrings in filenames, e.g. "metadata,thumbs.db" + + # Check if the provided arguments are valid if DATATYPE not in {".tif", ".tiff", ".TIF", ".TIFF"}: raise ValueError(f"Unsupported data type: {DATATYPE}") if UNIT not in {"q_A^-1", "q_nm^-1", "q_ang^-1"}: @@ -461,13 +549,15 @@ if __name__ == '__main__': if ERRORMODE not in {"poisson", "azimuthal", "none"}: raise ValueError(f"Unsupported error model: {ERRORMODE}") - + # Create the output directory if it does not exist if not os.path.isdir(path_int): os.mkdir(path_int) + # We open the poni file and the mask file (if provided) ai = pyFAI.load(fname_poni) mask = fabio.open(fname_mask).data if fname_mask else None + # Integrate images in the directory integrate_ims_in_dir(path_im, path_int) # print("Observing directory: " +str(path_im)) -- GitLab