Skip to content
Snippets Groups Projects
Select Git revision
  • e2d8b571ed9400fb8decfcf186ae9850e9ddc26f
  • master default protected
  • devel
  • adaptive_step_size
4 results

ackwork_test.go

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    maxwell_integrate_to_h5.py 11.03 KiB
    
    #!/usr/bin/env python3
    import pyFAI
    import fabio
    import numpy as np
    import os
    import sys
    from time import sleep
    from glob import glob
    import logging
    logger = logging.getLogger()
    logger.setLevel(logging.CRITICAL)
    import threading
    from watchdog.observers.polling import PollingObserver
    from watchdog.events import PatternMatchingEventHandler
    from multiprocessing.pool import ThreadPool as Pool
    import pandas as pd
    
    global NPROC
    
    
    def integrate_ims_in_dir(path_im, path_int, dtype_im=".tif", dtype_int=".dat"):
        """
        Azimuthally integrate all images in directory <path> with ending <dtype_im>
        to patterns of ending <dtype_int> if not already integrated.
        :param 'str' path: path to directory where to apply the azimuthal integration
        :param 'str' dtype_im: data type/filename ending of image file
        :param 'str' dtype_int: data type/filename ending of pattern file
        """
        global NPROC
        global FORBIDDEN
        fnames_ims = []#= glob(os.path.join(path_im, "*" + dtype_im))
        path_int_list = []
        for path, subdirs, files in os.walk(path_im):
            for name in files:
                if not any(forbidden in name for forbidden in FORBIDDEN):
                    fnames_ims.append(os.path.join(path, name))
                    if path_im != str(path):
                        path_new = str(path).replace(path_im,'')      
                        path_new = path_int + path_new 
                    else:
                        path_new = path_int 
                    path_int_list.append(path_new)
    
        #fnames_ims.sort(key=str.lower)
    
        def integration_thread(fname_im,path_int):
            global NPT
            global UNIT
            global POLARIZATION
            global ERRORMODE
    
            if not os.path.isdir(path_int):
                os.mkdir(path_int)
    
            im = fabio.open(fname_im).data
            basename_int = os.path.basename(fname_im)[:-len(dtype_im)] + dtype_int
            fname_int = os.path.join(path_int, basename_int)
    
            if not os.path.isfile(fname_int):
                # Perform integration and return results instead of saving to file
                if ERRORMODE == "none":
                    q, I = ai.integrate1d(
                        data=im,
                        npt=NPT,
                        mask=mask,
                        polarization_factor=POLARIZATION,
                        correctSolidAngle=True,
                        error_model=ERRORMODE,
                        unit=UNIT,
                    )
                    dI = np.zeros_like(I)
                else:
                    q, I, dI = ai.integrate1d(
                        data=im,
                        npt=NPT,
                        mask=mask,
                        polarization_factor=POLARIZATION,
                        correctSolidAngle=True,
                        error_model=ERRORMODE,
                        unit=UNIT,
                    )
            
                data = {
                    "q": q,
                    "I": I,
                    "dI": dI,
                    "filename": fname_im
                }
                
                return data
    
    
        pool = Pool(int(NPROC))
        for subdir in set(os.path.dirname(fname) for fname in fnames_ims):
            subdir_fnames = [fname for fname in fnames_ims if os.path.dirname(fname) == subdir]
            subdir_path_int = path_int_list[fnames_ims.index(subdir_fnames[0])]
    
            results = []
            filtered_fnames = [fname_im for fname_im in subdir_fnames if "metadata" not in fname_im]
    
            if filtered_fnames:
                # Use map_async to apply the integration_thread function to all filtered filenames
                async_result = pool.map_async(
                    lambda fname_im: integration_thread(fname_im, subdir_path_int),
                    filtered_fnames
                )
    
                pool.close()
                pool.join()
    
                # Export the DataFrame to a CSV file with the name of the subdirectory
                if async_result.ready():
                    # Retrieve results from async_result
                    results_data = async_result.get()
                    results_df = pd.DataFrame(results_data)
                    results_df = results_df.sort_values(by="filename", key=lambda col: col.str.lower())
                    subdir_name = os.path.basename(os.path.normpath(subdir_path_int))
                    results_df.to_csv(os.path.join(subdir_path_int, f"{subdir_name}.csv"), index=False)
                    results_df.to_hdf(os.path.join(subdir_path_int, f"{subdir_name}.h5"), key='data', mode='w')
                    print(f"Results for subdirectory {subdir_name} saved to CSV and HDF5 files.")
                    del results_df
                else:
                    print(f"No images were integrated in subdirectory {subdir}. No results DataFrame created.")
            else:
                print(f"No valid filenames found in subdirectory {subdir}.")
    
            # Reset the pool for the next subdirectory
            pool = Pool(int(NPROC))
    
            
    def integrate_on_created(event, path_int, dtype_im=".tif", dtype_int=".dat"):
        """
        Azimuthally integrate all created images in directory <path>
        with ending <dtype_im> to patterns of ending <dtype_int>.
        :param 'watchdog.events.FileCreatedEvent' event: watchdog object to check for created files
        :param 'str' dtype_im: data type/filename ending of image file
        :param 'str' dtype_int: data type/filename ending of pattern file
        """
            
        if not os.path.isdir(path_int):
            os.mkdir(path_int)
    
        if event.src_path[-len(dtype_im):] == dtype_im:
            im = fabio.open(event.src_path).data
            basename_int = os.path.basename(event.src_path)[:-len(dtype_im)] + dtype_int
            if ("metadata" not in im):
                # Perform integration and return results instead of saving to file
                if ERRORMODE == "none":
                    q, I = ai.integrate1d(
                        data=im,
                        npt=NPT,
                        mask=mask,
                        polarization_factor=POLARIZATION,
                        correctSolidAngle=True,
                        error_model=ERRORMODE,
                        unit=UNIT,
                    )
                    dI = np.zeros_like(I)
                else:
                    q, I, dI = ai.integrate1d(
                        data=im,
                        npt=NPT,
                        mask=mask,
                        polarization_factor=POLARIZATION,
                        correctSolidAngle=True,
                        error_model=ERRORMODE,
                        unit=UNIT,
                    )
            
                data = {
                    "q": q,
                    "I": I,
                    "dI": dI,
                    "filename": im
                }
    
                # Check if the DataFrame exists, otherwise create it
                if 'results_df' not in globals():
                    results_df = pd.DataFrame(data)
                else:
                    results_df = pd.concat([results_df, pd.DataFrame(data)], ignore_index=True)
    
    
    
    class Handler(PatternMatchingEventHandler):
        patterns = ["*tif","*tiff","*TIF","*TIFF"]
        ignore_patterns = []
        ignore_directories = True
        case_sensitive = True
        go_recursively = True
        
        def __init__(self, path_im, path_int):
                PatternMatchingEventHandler.__init__(self)
                self.path_im = path_im
                self.path_int = path_int
    
        def on_created(self, event):
            #wait that the transfer of the file is finished before processing it
            path_event = str(os.path.dirname(event.src_path))
            print(path_event)
            if self.path_im != path_event:
                if self.path_im in path_event:
                    path_event = path_event.replace(self.path_im,'')
                    path_int = self.path_int + path_event
            else:
                path_int = self.path_int
            integrate_on_created(event,path_int)
    
    
    if __name__ == '__main__':
        if len(sys.argv) != 11:
            print("Usage: python maxwell_integrate_with_subdirs.py <path_im> <path_int> <fname_poni> <fname_mask> <NPROC> <POLARIZATION> <NPT> <UNIT> <ERRORMODE> <DATATYPE>")
            sys.exit(1)
        if not sys.argv[1].endswith('/'):
            sys.argv[1] += '/'
        if not sys.argv[2].endswith('/'):
            sys.argv[2] += '/'
        if not sys.argv[3].endswith('.poni'):
            raise ValueError("The poni file must have a .poni extension")
        if not sys.argv[4].endswith('.mask') and sys.argv[4] != "None":
            raise ValueError("The mask file must have a .mask extension or be 'None' if no mask is used")
        if not sys.argv[5].isdigit():
            raise ValueError("NPROC must be a positive integer")
        if not sys.argv[6].replace('.', '', 1).isdigit() or not (0 <= float(sys.argv[6]) <= 1):
            raise ValueError("POLARIZATION must be a float between 0 and 1")
        if not sys.argv[7].isdigit():
            raise ValueError("NPT must be a positive integer")
        if not isinstance(sys.argv[8],str):
            
            raise ValueError("UNIT must be a string representing the unit (e.g., 'q_A^-1', 'q_nm^-1', 'q_ang^-1')")
        if not sys.argv[9].isalpha():
            raise ValueError("ERRORMODE must be a string representing the error model (e.g., 'poisson', 'azimuthal', 'none')")
        if not isinstance(sys.argv[10], str):
            raise ValueError("DATATYPE must be a string representing the data type (e.g., 'tif', 'tiff')")
    
            
        path_im=sys.argv[1]
        path_int=sys.argv[2]
        fname_poni=sys.argv[3]
        fname_mask=sys.argv[4]
        NPROC=int(sys.argv[5])
        POLARIZATION=float(sys.argv[6])
        NPT=int(sys.argv[7])
        UNIT=str(sys.argv[8])
        ERRORMODE = str(sys.argv[9]).lower()
        DATATYPE = str(sys.argv[10]).lower()
        FORBIDDEN = sys.argv[11].split(',') if len(sys.argv) > 11 else []
    
        if DATATYPE not in {".tif", ".tiff", ".TIF", ".TIFF"}:
            raise ValueError(f"Unsupported data type: {DATATYPE}")
        if UNIT not in {"q_A^-1", "q_nm^-1", "q_ang^-1"}:
            raise ValueError(f"Unsupported unit: {UNIT}")
        if NPROC <= 0:
            raise ValueError(f"Number of processes must be a positive integer: {NPROC}")
        if POLARIZATION < 0 or POLARIZATION > 1:
            raise ValueError(f"Polarization factor must be between 0 and 1: {POLARIZATION}")
        if NPT <= 0:
            raise ValueError(f"Number of points must be a positive integer: {NPT}")
        if not os.path.isdir(path_int):
            raise ValueError(f"Path to integrated patterns does not exist: {path_int}")
        if not os.path.isdir(path_im):
            raise ValueError(f"Path to images does not exist: {path_im}")
        if not os.path.isfile(fname_poni):
            raise ValueError(f"File with poni parameters does not exist: {fname_poni}")
        if not os.path.isfile(fname_mask) and fname_mask != "None":
            raise ValueError(f"File with mask does not exist: {fname_mask}")    
        if ERRORMODE not in {"poisson", "azimuthal", "none"}:
            raise ValueError(f"Unsupported error model: {ERRORMODE}")
        
        
        if not os.path.isdir(path_int):
            os.mkdir(path_int)
    
        ai = pyFAI.load(fname_poni)
        mask = fabio.open(fname_mask).data if fname_mask else None
        
        integrate_ims_in_dir(path_im, path_int)
    
        # print("Observing directory:  " +str(path_im))
    
        # event_handler = Handler(path_im,path_int)
        # observer = PollingObserver()
        # observer.schedule(event_handler, path_im, recursive=True)
        # print("About to start observer")
        # observer.start()
    
        # try:
        #     while True:
        #         sleep(0.05)
        # except KeyboardInterrupt:
        #     observer.stop()
        #     observer.join()