From 31e4c9a599f3e41770ae5c54a8a3653cda737eca Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:20:25 -0800 Subject: [PATCH 01/21] Updating ALCF endpoints to include the synaps-i allocation (to be set up) --- config.yml | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/config.yml b/config.yml index 3f26a4f0..0d24832a 100644 --- a/config.yml +++ b/config.yml @@ -72,17 +72,23 @@ globus: uuid: 75b478b2-37af-46df-bfbd-71ed692c6506 name: data832_scratch - alcf832_raw: + alcf832_synaps: + root_path: / + uri: alcf.anl.gov + uuid: TBD + name: alcf832_synaps + + alcf832_iri_raw: root_path: /data/raw uri: alcf.anl.gov uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 - name: alcf_raw + name: alcf_iri_raw - alcf832_scratch: + alcf832_iri_scratch: root_path: /data/scratch uri: alcf.anl.gov uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 - name: alcf_scratch + name: alcf_iri_scratch alcf_eagle832: root_path: /IRIBeta/als/example From bf712a4f11af3717daf3b3c9722923e64ecb012d Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:21:44 -0800 Subject: [PATCH 02/21] Updating bl832 config.py to distinguish IRI and SYNAPS-I ALCF endpoints --- orchestration/flows/bl832/config.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 788eef4a..7294b0a7 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -24,7 +24,8 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_pscratch_raw = self.endpoints["nersc832_alsdev_pscratch_raw"] self.nersc832_alsdev_pscratch_scratch = self.endpoints["nersc832_alsdev_pscratch_scratch"] self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] - self.alcf832_raw = self.endpoints["alcf832_raw"] - self.alcf832_scratch = self.endpoints["alcf832_scratch"] - self.scicat = self.config["scicat"] - self.ghcr_images832 = self.config["ghcr_images832"] + self.alcf832_synaps = self.endpoints["alcf832_synaps"] + self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] + self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] + self.scicat = config["scicat"] + self.ghcr_images832 = config["ghcr_images832"] From 77d6bc8a5187bc45b3d63a17569c61d15f934318 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:22:11 -0800 Subject: [PATCH 03/21] Adding the config.yaml file for setting up the globus compute endpoint for reconstruction on ALCF --- .../polaris/globus_compute_recon_config.yaml | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 scripts/polaris/globus_compute_recon_config.yaml diff --git a/scripts/polaris/globus_compute_recon_config.yaml b/scripts/polaris/globus_compute_recon_config.yaml new file mode 100644 index 00000000..66ffd331 --- /dev/null +++ b/scripts/polaris/globus_compute_recon_config.yaml @@ -0,0 +1,39 @@ +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 2 + max_workers: 1 # Sets one worker per node + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: debug + cpus_per_node: 64 + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle" + + # Node setup: activate necessary conda environment and such + worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" + + walltime: 00:60:00 # Jobs will end after 60 minutes + nodes_per_block: 2 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time From f4f9efa31338f482f6ef8e6b25410ead0ecc97a6 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:22:54 -0800 Subject: [PATCH 04/21] Adding the config.yaml file for setting up the globus compute endpoint for segmentation on ALCF. Still needs to be configured for GPU and the environment with dependencies --- .../globus_compute_segment_config.yaml | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 scripts/polaris/globus_compute_segment_config.yaml diff --git a/scripts/polaris/globus_compute_segment_config.yaml b/scripts/polaris/globus_compute_segment_config.yaml new file mode 100644 index 00000000..07bced00 --- /dev/null +++ b/scripts/polaris/globus_compute_segment_config.yaml @@ -0,0 +1,41 @@ +# This needs to be updated to use GPUs and a segmentation environment + +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 2 + max_workers: 1 # Sets one worker per node + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: debug + cpus_per_node: 64 + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle" + + # Node setup: activate necessary conda environment and such + worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" + + walltime: 00:60:00 # Jobs will end after 60 minutes + nodes_per_block: 2 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time From 0f5d5c9c0e9fcb71d5d5406ee6a312ae17615ea2 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:23:53 -0800 Subject: [PATCH 05/21] Adding segmentation Prefect task, and segmentation globus compute code for the TomographyController. Turning off TIFF to ZARR on ALCF for the demo --- orchestration/flows/bl832/alcf.py | 195 +++++++++++++++++++++++++----- 1 file changed, 166 insertions(+), 29 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index bdf96ac2..c0126985 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -35,7 +35,7 @@ def __init__( # The block must be registered with the name "alcf-allocation-root-path" logger = get_run_logger() allocation_data = Variable.get("alcf-allocation-root-path", _sync=True) - self.allocation_root = allocation_data.get("alcf-allocation-root-path") + self.allocation_root = allocation_data.get("alcf-allocation-root-path") # eagle/SYNAPS-I/ if not self.allocation_root: raise ValueError("Allocation root not found in JSON block 'alcf-allocation-root-path'") logger.info(f"Allocation root loaded: {self.allocation_root}") @@ -57,17 +57,19 @@ def reconstruct( file_name = Path(file_path).stem + ".h5" folder_name = Path(file_path).parent.name - iri_als_bl832_rundir = f"{self.allocation_root}/data/raw" - iri_als_bl832_recon_script = f"{self.allocation_root}/scripts/globus_reconstruction.py" + rundir = f"{self.allocation_root}/data/bl832/raw" + recon_script = f"{self.allocation_root}/reconstruction/scripts/globus_reconstruction.py" gcc = Client(code_serialization_strategy=CombinedCode()) + # TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID + # We will probably have 2 endpoints, one for recon, one for segmentation with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF") future = fxe.submit( self._reconstruct_wrapper, - iri_als_bl832_rundir, - iri_als_bl832_recon_script, + rundir, + recon_script, file_name, folder_name ) @@ -76,8 +78,8 @@ def reconstruct( @staticmethod def _reconstruct_wrapper( - rundir: str = "/eagle/IRIProd/ALS/data/raw", - script_path: str = "/eagle/IRIProd/ALS/scripts/globus_reconstruction.py", + rundir: str = "/eagle/SYNAPS-I/data/bl832/raw", + script_path: str = "/eagle/SYNAPS-I/reconstruction/scripts/globus_reconstruction.py", h5_file_name: str = None, folder_path: str = None ) -> str: @@ -185,6 +187,101 @@ def _build_multi_resolution_wrapper( f"Converted tiff files to zarr;\n {zarr_res}" ) + def segmentation( + self, + folder_path: str = "", + ) -> bool: + """ + Run tomography segmentation at ALCF through Globus Compute. + + :param folder_path: Path to the TIFF folder to be processed. + + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + + # Operate on reconstructed data + rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{Path(folder_path).name}" + output_dir = f"{self.allocation_root}/data/bl832/scratch/segmentation/{Path(folder_path).name}" + segmentation_script = f"{self.allocation_root}/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py" + + gcc = Client(code_serialization_strategy=CombinedCode()) + + # TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID + # We will probably have 2 endpoints, one for recon, one for segmentation + with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: + logger.info(f"Running segmentation on {folder_path} at ALCF") + future = fxe.submit( + self._segmentation_wrapper, + input_dir=rundir, + output_dir=output_dir, + script_path=segmentation_script, + output_dir=folder_path, + ) + result = self._wait_for_globus_compute_future(future, "segmentation", check_interval=10) + return result + + @staticmethod + def _segmentation_wrapper( + input_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/reconstruction/", + output_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/", + script_path: str = "/eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py", + nproc_per_node: int = 4, + nnodes: int = 1, + nnode_rank: int = 0, + master_addr: str = "localhost", + master_port: str = "29500", + patch_size: int = 512, + batch_size: int = 1, + num_workers: int = 4, + confidence: float = 0.5, + prompts: list[str] = ["background", "cell"], + ) -> str: + """ + Python function that wraps around the application call for segmentation on ALCF + + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will run the segmentation + :param folder_path: the path to the folder containing the TIFF data to be segmented + :return: confirmation message + """ + import os + import subprocess + import time + + seg_start = time.time() + + # Move to directory where data are located + os.chdir(input_dir) + + # Run segmentation.py + command = [ + "torchrun", + f"--nproc_per_node={nproc_per_node}", + f"--nnodes={nnodes}", + f"--node_rank={nnode_rank}", + f"--master_addr={master_addr}", + f"--master_port={master_port}", + "-m", script_path, + "--input-dir", input_dir, + "--output-dir", output_dir, + "--patch-size", str(patch_size), + "--batch-size", str(batch_size), + "--num-workers", str(num_workers), + "--confidence", str(confidence), + "--prompts", *prompts, + ] + + segment_res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + seg_end = time.time() + + print(f"Segmented data in {input_dir} in {seg_end-seg_start} seconds;\n {segment_res}") + return ( + f"Segmented data specified in {input_dir} in {seg_end-seg_start} seconds;\n" + f"{segment_res}" + ) + @staticmethod def _wait_for_globus_compute_future( future: Future, @@ -368,7 +465,7 @@ def alcf_recon_flow( config: Optional[Config832] = None, ) -> bool: """ - Process and transfer a file from a source to the ALCF. + Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation. Args: file_path (str): The path to the file to be processed. @@ -437,51 +534,91 @@ def alcf_recon_flow( destination=config.data832_scratch ) - # STEP 2B: Run the Tiff to Zarr Globus Flow - logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") - alcf_multi_res_success = tomography_controller.build_multi_resolution( - file_path=file_path, + # STEP 3: Run the Segmentation Task at ALCF + logger.info(f"Starting ALCF segmentation task for {scratch_path_tiff=}") + alcf_segmentation_success = alcf_segmentation_task( + recon_folder_path=scratch_path_tiff, + config=config ) - if not alcf_multi_res_success: - logger.error("Tiff to Zarr Failed.") - raise ValueError("Tiff to Zarr at ALCF Failed") + if not alcf_segmentation_success: + logger.warning("Segmentation at ALCF Failed") else: - logger.info("Tiff to Zarr Successful.") - # Transfer B: Send reconstructed data (zarr) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " - f"at ALCF to {config.data832_scratch} at data832") - data832_zarr_transfer_success = transfer_controller.copy( - file_path=scratch_path_zarr, - source=config.alcf832_scratch, - destination=config.data832_scratch - ) + logger.info("Segmentation at ALCF Successful") + + # Not running TIFF to Zarr conversion at ALCF for now + # STEP 2B: Run the Tiff to Zarr Globus Flow + # logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") + # alcf_multi_res_success = tomography_controller.build_multi_resolution( + # file_path=file_path, + # ) + # if not alcf_multi_res_success: + # logger.error("Tiff to Zarr Failed.") + # raise ValueError("Tiff to Zarr at ALCF Failed") + # else: + # logger.info("Tiff to Zarr Successful.") + # # Transfer B: Send reconstructed data (zarr) to data832 + # logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + # f"at ALCF to {config.data832_scratch} at data832") + # data832_zarr_transfer_success = transfer_controller.copy( + # file_path=scratch_path_zarr, + # source=config.alcf832_scratch, + # destination=config.data832_scratch + # ) # Place holder in case we want to transfer to NERSC for long term storage nersc_transfer_success = False - data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success + # data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success schedule_pruning( alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, - alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, + # alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, # Commenting out zarr for now nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, - data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, + # data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, # Commenting out zarr one_minute=False, # Set to False for production durations config=config ) # TODO: ingest to scicat - if alcf_reconstruction_success and alcf_multi_res_success: + if alcf_reconstruction_success and alcf_segmentation_success: # and alcf_multi_res_success: return True else: return False -if __name__ == "__main__": +@task(name="alcf_segmentation_task") +def alcf_segmentation_task( + recon_folder_path: str, + config: Optional[Config832] = None, +): + logger = get_run_logger() + if config is None: + logger.info("No config provided, using default Config832.") + config = Config832() + + # Initialize the Tomography Controller and run the segmentation + logger.info("Initializing ALCF Tomography HPC Controller.") + tomography_controller = get_controller( + hpc_type=HPC.ALCF, + config=config + ) + logger.info(f"Starting ALCF segmentation task for {recon_folder_path=}") + alcf_segmentation_success = tomography_controller.segmentation( + recon_folder_path=recon_folder_path, + ) + if not alcf_segmentation_success: + logger.error("Segmentation Failed.") + else: + logger.info("Segmentation Successful.") + return alcf_segmentation_success + + +@flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") +def alcf_segmentation_integration_test(): folder_name = 'dabramov' file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast' flow_success = alcf_recon_flow( From d3ad2197ea9cbcdcabf013f2d87cb3cf13628dc2 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:30:02 -0800 Subject: [PATCH 06/21] ensuring self.config for scicat and ghcr images --- orchestration/flows/bl832/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 7294b0a7..17a2afbe 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -27,5 +27,5 @@ def _beam_specific_config(self) -> None: self.alcf832_synaps = self.endpoints["alcf832_synaps"] self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] - self.scicat = config["scicat"] - self.ghcr_images832 = config["ghcr_images832"] + self.scicat = self.config["scicat"] + self.ghcr_images832 = self.config["ghcr_images832"] \ No newline at end of file From a96c4a87c26446e0a67961b7b1d04aa46c4f6146 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:30:39 -0800 Subject: [PATCH 07/21] linting --- orchestration/flows/bl832/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 17a2afbe..da753411 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -28,4 +28,4 @@ def _beam_specific_config(self) -> None: self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] self.scicat = self.config["scicat"] - self.ghcr_images832 = self.config["ghcr_images832"] \ No newline at end of file + self.ghcr_images832 = self.config["ghcr_images832"] From 5873d9412c3df6098b7bee23efaf48e0a2b27905 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:57:25 -0800 Subject: [PATCH 08/21] Making separate ALCF SYNAPS-I endpoint configs for raw, reconstructed, and segmented data --- config.yml | 19 ++++++++++++++++--- orchestration/flows/bl832/alcf.py | 18 ++++++++++++------ orchestration/flows/bl832/config.py | 4 +++- 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/config.yml b/config.yml index 0d24832a..b4d29a5d 100644 --- a/config.yml +++ b/config.yml @@ -46,6 +46,7 @@ globus: uri: beegfs.als.lbl.gov uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a name: bl733-beegfs-data + # 8.3.2 ENDPOINTS spot832: @@ -72,11 +73,23 @@ globus: uuid: 75b478b2-37af-46df-bfbd-71ed692c6506 name: data832_scratch - alcf832_synaps: - root_path: / + alcf832_synaps_raw: + root_path: /data/bl832/raw + uri: alcf.anl.gov + uuid: TBD + name: alcf832_synaps_raw + + alcf832_synaps_recon: + root_path: /data/bl832/scratch/reconstruction/ + uri: alcf.anl.gov + uuid: TBD + name: alcf832_synaps_recon + + alcf832_synaps_segment: + root_path: /data/bl832/scratch/segmentation/ uri: alcf.anl.gov uuid: TBD - name: alcf832_synaps + name: alcf832_synaps_segment alcf832_iri_raw: root_path: /data/raw diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index c0126985..766ae211 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -195,7 +195,6 @@ def segmentation( Run tomography segmentation at ALCF through Globus Compute. :param folder_path: Path to the TIFF folder to be processed. - :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() @@ -484,6 +483,7 @@ def alcf_recon_flow( file_name = path.stem h5_file_name = file_name + '.h5' scratch_path_tiff = folder_name + '/rec' + file_name + '/' + scratch_path_segment = folder_name + '/seg' + file_name + '/' scratch_path_zarr = folder_name + '/rec' + file_name + '.zarr/' # initialize transfer_controller with globus @@ -498,7 +498,7 @@ def alcf_recon_flow( alcf_transfer_success = transfer_controller.copy( file_path=data832_raw_path, source=config.data832_raw, - destination=config.alcf832_raw + destination=config.alcf832_synaps_raw ) logger.info(f"Transfer status: {alcf_transfer_success}") @@ -526,11 +526,11 @@ def alcf_recon_flow( logger.info("Reconstruction Successful.") # Transfer A: Send reconstructed data (tiff) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " f"at ALCF to {config.data832_scratch} at data832") data832_tiff_transfer_success = transfer_controller.copy( file_path=scratch_path_tiff, - source=config.alcf832_scratch, + source=config.alcf832_synaps_recon, destination=config.data832_scratch ) @@ -544,6 +544,12 @@ def alcf_recon_flow( logger.warning("Segmentation at ALCF Failed") else: logger.info("Segmentation at ALCF Successful") + segment_transfer_success = transfer_controller.copy( + file_path=scratch_path_segment, + source=config.alcf832_synaps_segment, + destination=config.data832_scratch + ) + logger.info(f"Transfer segmented data to data832 success: {segment_transfer_success}") # Not running TIFF to Zarr conversion at ALCF for now # STEP 2B: Run the Tiff to Zarr Globus Flow @@ -621,8 +627,8 @@ def alcf_segmentation_task( def alcf_segmentation_integration_test(): folder_name = 'dabramov' file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast' - flow_success = alcf_recon_flow( - file_path=f"/{folder_name}/{file_name}.h5", + flow_success = alcf_segmentation_task( + recon_folder_path=f"/{folder_name}/{file_name}", config=Config832() ) print(flow_success) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index da753411..d523952d 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -24,7 +24,9 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_pscratch_raw = self.endpoints["nersc832_alsdev_pscratch_raw"] self.nersc832_alsdev_pscratch_scratch = self.endpoints["nersc832_alsdev_pscratch_scratch"] self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] - self.alcf832_synaps = self.endpoints["alcf832_synaps"] + self.alcf832_synaps_raw = self.endpoints["alcf832_synaps_raw"] + self.alcf832_synaps_recon = self.endpoints["alcf832_synaps_recon"] + self.alcf832_synaps_segment = self.endpoints["alcf832_synaps_segment"] self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] self.scicat = self.config["scicat"] From 49e6e7f881507f9a18f98cab217cb89e03bd65b9 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Jan 2026 11:53:46 -0800 Subject: [PATCH 09/21] Refactoring ALCF reconstruction flow to use the prune_controller class --- orchestration/flows/bl832/alcf.py | 356 +++++++++++++++++++----------- 1 file changed, 222 insertions(+), 134 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 766ae211..0794be99 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -1,5 +1,5 @@ from concurrent.futures import Future -import datetime +# import datetime from pathlib import Path import time from typing import Optional @@ -12,8 +12,9 @@ from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController +# from orchestration.prefect import schedule_prefect_flow +from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.transfer_controller import get_transfer_controller, CopyMethod -from orchestration.prefect import schedule_prefect_flow class ALCFTomographyHPCController(TomographyHPCController): @@ -189,33 +190,36 @@ def _build_multi_resolution_wrapper( def segmentation( self, - folder_path: str = "", + recon_folder_path: str = "", ) -> bool: """ Run tomography segmentation at ALCF through Globus Compute. - :param folder_path: Path to the TIFF folder to be processed. + :param recon_folder_path: Path to the reconstructed data folder to be processed. :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() # Operate on reconstructed data - rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{Path(folder_path).name}" - output_dir = f"{self.allocation_root}/data/bl832/scratch/segmentation/{Path(folder_path).name}" - segmentation_script = f"{self.allocation_root}/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py" + rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{recon_folder_path}" + output_dir = f"{self.allocation_root}/data/bl832/scratch/segmentation/{recon_folder_path}" + segmentation_module = "src.inference" + workdir = f"{self.allocation_root}/segmentation/scripts/forge_feb_seg_model_demo" gcc = Client(code_serialization_strategy=CombinedCode()) # TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID # We will probably have 2 endpoints, one for recon, one for segmentation - with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: - logger.info(f"Running segmentation on {folder_path} at ALCF") + endpoint_id = "168c595b-9493-42db-9c6a-aad960913de2" + # with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: + with Executor(endpoint_id=endpoint_id, client=gcc) as fxe: + logger.info(f"Running segmentation on {recon_folder_path} at ALCF") future = fxe.submit( self._segmentation_wrapper, input_dir=rundir, output_dir=output_dir, - script_path=segmentation_script, - output_dir=folder_path, + script_module=segmentation_module, + workdir=workdir ) result = self._wait_for_globus_compute_future(future, "segmentation", check_interval=10) return result @@ -224,7 +228,8 @@ def segmentation( def _segmentation_wrapper( input_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/reconstruction/", output_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/", - script_path: str = "/eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py", + script_module: str = "src.inference", + workdir: str = "/eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo", nproc_per_node: int = 4, nnodes: int = 1, nnode_rank: int = 0, @@ -250,18 +255,18 @@ def _segmentation_wrapper( seg_start = time.time() - # Move to directory where data are located - os.chdir(input_dir) + # Move to directory where the segmentation code is located + os.chdir(workdir) # Run segmentation.py command = [ - "torchrun", + "python", "-m", "torch.distributed.run", f"--nproc_per_node={nproc_per_node}", f"--nnodes={nnodes}", f"--node_rank={nnode_rank}", f"--master_addr={master_addr}", f"--master_port={master_port}", - "-m", script_path, + "-m", script_module, "--input-dir", input_dir, "--output-dir", output_dir, "--patch-size", str(patch_size), @@ -353,109 +358,109 @@ def _wait_for_globus_compute_future( return success -@task(name="schedule_prune_task") -def schedule_prune_task( - path: str, - location: str, - schedule_days: datetime.timedelta, - source_endpoint=None, - check_endpoint=None -) -> bool: - """ - Schedules a Prefect flow to prune files from a specified location. - - Args: - path (str): The file path to the folder containing the files. - location (str): The server location (e.g., 'alcf832_raw') where the files will be pruned. - schedule_days (int): The number of days after which the file should be deleted. - source_endpoint (str): The source endpoint for the files. - check_endpoint (str): The endpoint to check for the existence of the files. - - Returns: - bool: True if the task was scheduled successfully, False otherwise. - """ - logger = get_run_logger() - - try: - flow_name = f"delete {location}: {Path(path).name}" - schedule_prefect_flow( - deployment_name=f"prune_{location}/prune_{location}", - flow_run_name=flow_name, - parameters={ - "relative_path": path, - "source_endpoint": source_endpoint, - "check_endpoint": check_endpoint - }, - duration_from_now=schedule_days - ) - return True - except Exception as e: - logger.error(f"Failed to schedule prune task: {e}") - return False - - -@task(name="schedule_pruning") -def schedule_pruning( - alcf_raw_path: str = None, - alcf_scratch_path_tiff: str = None, - alcf_scratch_path_zarr: str = None, - nersc_scratch_path_tiff: str = None, - nersc_scratch_path_zarr: str = None, - data832_raw_path: str = None, - data832_scratch_path_tiff: str = None, - data832_scratch_path_zarr: str = None, - one_minute: bool = False, - config: Config832 = None -) -> bool: - """ - This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832. - - Args: - alcf_raw_path (str, optional): The raw path of the h5 file on ALCF. - alcf_scratch_path_tiff (str, optional): The scratch path for TIFF files on ALCF. - alcf_scratch_path_zarr (str, optional): The scratch path for Zarr files on ALCF. - nersc_scratch_path_tiff (str, optional): The scratch path for TIFF files on NERSC. - nersc_scratch_path_zarr (str, optional): The scratch path for Zarr files on NERSC. - data832_scratch_path (str, optional): The scratch path on data832. - one_minute (bool, optional): Defaults to False. Whether to schedule the deletion after one minute. - config (Config832, optional): Configuration object for the flow. - - Returns: - bool: True if the tasks were scheduled successfully, False otherwise. - """ - logger = get_run_logger() - - pruning_config = Variable.get("pruning-config", _sync=True) - - if one_minute: - alcf_delay = datetime.timedelta(minutes=1) - nersc_delay = datetime.timedelta(minutes=1) - data832_delay = datetime.timedelta(minutes=1) - else: - alcf_delay = datetime.timedelta(days=pruning_config["delete_alcf832_files_after_days"]) - nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) - data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) - - # (path, location, days, source_endpoint, check_endpoint) - delete_schedules = [ - (alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw), - (alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), - (alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), - (nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), - (nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), - (data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None), - (data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None), - (data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None) - ] - - for path, location, days, source_endpoint, check_endpoint in delete_schedules: - if path: - schedule_prune_task(path, location, days, source_endpoint, check_endpoint) - logger.info(f"Scheduled delete from {location} at {days} days") - else: - logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.") - - return True +# @task(name="schedule_prune_task") +# def schedule_prune_task( +# path: str, +# location: str, +# schedule_days: datetime.timedelta, +# source_endpoint=None, +# check_endpoint=None +# ) -> bool: +# """ +# Schedules a Prefect flow to prune files from a specified location. + +# Args: +# path (str): The file path to the folder containing the files. +# location (str): The server location (e.g., 'alcf832_raw') where the files will be pruned. +# schedule_days (int): The number of days after which the file should be deleted. +# source_endpoint (str): The source endpoint for the files. +# check_endpoint (str): The endpoint to check for the existence of the files. + +# Returns: +# bool: True if the task was scheduled successfully, False otherwise. +# """ +# logger = get_run_logger() + +# try: +# flow_name = f"delete {location}: {Path(path).name}" +# schedule_prefect_flow( +# deployment_name=f"prune_{location}/prune_{location}", +# flow_run_name=flow_name, +# parameters={ +# "relative_path": path, +# "source_endpoint": source_endpoint, +# "check_endpoint": check_endpoint +# }, +# duration_from_now=schedule_days +# ) +# return True +# except Exception as e: +# logger.error(f"Failed to schedule prune task: {e}") +# return False + + +# @task(name="schedule_pruning") +# def schedule_pruning( +# alcf_raw_path: str = None, +# alcf_scratch_path_tiff: str = None, +# alcf_scratch_path_zarr: str = None, +# nersc_scratch_path_tiff: str = None, +# nersc_scratch_path_zarr: str = None, +# data832_raw_path: str = None, +# data832_scratch_path_tiff: str = None, +# data832_scratch_path_zarr: str = None, +# one_minute: bool = False, +# config: Config832 = None +# ) -> bool: +# """ +# This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832. + +# Args: +# alcf_raw_path (str, optional): The raw path of the h5 file on ALCF. +# alcf_scratch_path_tiff (str, optional): The scratch path for TIFF files on ALCF. +# alcf_scratch_path_zarr (str, optional): The scratch path for Zarr files on ALCF. +# nersc_scratch_path_tiff (str, optional): The scratch path for TIFF files on NERSC. +# nersc_scratch_path_zarr (str, optional): The scratch path for Zarr files on NERSC. +# data832_scratch_path (str, optional): The scratch path on data832. +# one_minute (bool, optional): Defaults to False. Whether to schedule the deletion after one minute. +# config (Config832, optional): Configuration object for the flow. + +# Returns: +# bool: True if the tasks were scheduled successfully, False otherwise. +# """ +# logger = get_run_logger() + +# pruning_config = Variable.get("pruning-config", _sync=True) + +# if one_minute: +# alcf_delay = datetime.timedelta(minutes=1) +# nersc_delay = datetime.timedelta(minutes=1) +# data832_delay = datetime.timedelta(minutes=1) +# else: +# alcf_delay = datetime.timedelta(days=pruning_config["delete_alcf832_files_after_days"]) +# nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) +# data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) + +# # (path, location, days, source_endpoint, check_endpoint) +# delete_schedules = [ +# (alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw), +# (alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), +# (alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), +# (nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), +# (nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), +# (data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None), +# (data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None), +# (data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None) +# ] + +# for path, location, days, source_endpoint, check_endpoint in delete_schedules: +# if path: +# schedule_prune_task(path, location, days, source_endpoint, check_endpoint) +# logger.info(f"Scheduled delete from {location} at {days} days") +# else: +# logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.") + +# return True @flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{file_path}") @@ -533,6 +538,7 @@ def alcf_recon_flow( source=config.alcf832_synaps_recon, destination=config.data832_scratch ) + logger.info(f"Transfer reconstructed TIFF data to data832 success: {data832_tiff_transfer_success}") # STEP 3: Run the Segmentation Task at ALCF logger.info(f"Starting ALCF segmentation task for {scratch_path_tiff=}") @@ -552,6 +558,8 @@ def alcf_recon_flow( logger.info(f"Transfer segmented data to data832 success: {segment_transfer_success}") # Not running TIFF to Zarr conversion at ALCF for now + alcf_multi_res_success = False + data832_zarr_transfer_success = False # STEP 2B: Run the Tiff to Zarr Globus Flow # logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") # alcf_multi_res_success = tomography_controller.build_multi_resolution( @@ -572,22 +580,99 @@ def alcf_recon_flow( # ) # Place holder in case we want to transfer to NERSC for long term storage - nersc_transfer_success = False + # nersc_transfer_success = False - # data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success - schedule_pruning( - alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, - # alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, # Commenting out zarr for now - nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, - nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, - data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, - # data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, # Commenting out zarr - one_minute=False, # Set to False for production durations + # STEP 4: Schedule Pruning of files + logger.info("Scheduling file pruning tasks.") + prune_controller = get_prune_controller( + prune_type=PruneMethod.GLOBUS, config=config ) + # Prune from ALCF raw + if alcf_transfer_success: + logger.info("Scheduling pruning of ALCF raw data.") + prune_controller.prune( + file_path=data832_raw_path, + source_endpoint=config.alcf832_synaps_raw, + check_endpoint=None, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/reconstruction + if alcf_reconstruction_success: + logger.info("Scheduling pruning of ALCF scratch reconstruction data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/segmentation + if alcf_segmentation_success: + logger.info("Scheduling pruning of ALCF scratch segmentation data.") + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.alcf832_synaps_segment, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune ZARR from ALCF scratch/reconstruction + if alcf_multi_res_success: + logger.info("Scheduling pruning of ALCF scratch zarr reconstruction data.") + prune_controller.prune( + file_path=scratch_path_zarr, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune reconstructed TIFFs from data832 scratch + if data832_tiff_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction TIFF data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # Prune reconstructed ZARR from data832 scratch + if data832_zarr_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction ZARR data.") + prune_controller.prune( + file_path=scratch_path_zarr, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # Prune segmented data from data832 scratch + if alcf_segmentation_success: + logger.info("Scheduling pruning of data832 scratch segmentation data.") + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success + # schedule_pruning( + # alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, + # alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, + # # alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, # Commenting out zarr for now + # nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, + # nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, + # data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, + # data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, + # # data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, # Commenting out zarr + # one_minute=False, # Set to False for production durations + # config=config + # ) + # TODO: ingest to scicat if alcf_reconstruction_success and alcf_segmentation_success: # and alcf_multi_res_success: @@ -625,10 +710,13 @@ def alcf_segmentation_task( @flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") def alcf_segmentation_integration_test(): - folder_name = 'dabramov' - file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast' + recon_folder_path = 'rec20211222_125057_petiole4' flow_success = alcf_segmentation_task( - recon_folder_path=f"/{folder_name}/{file_name}", + recon_folder_path=recon_folder_path, config=Config832() ) print(flow_success) + + +if __name__ == "__main__": + alcf_segmentation_integration_test() From 922b7151f2c0c624d25f9adb7a720adadc2f911b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Jan 2026 11:54:47 -0800 Subject: [PATCH 10/21] Removing old commented out prune code --- orchestration/flows/bl832/alcf.py | 121 ------------------------------ 1 file changed, 121 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 0794be99..086a3668 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -1,5 +1,4 @@ from concurrent.futures import Future -# import datetime from pathlib import Path import time from typing import Optional @@ -12,7 +11,6 @@ from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController -# from orchestration.prefect import schedule_prefect_flow from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.transfer_controller import get_transfer_controller, CopyMethod @@ -358,111 +356,6 @@ def _wait_for_globus_compute_future( return success -# @task(name="schedule_prune_task") -# def schedule_prune_task( -# path: str, -# location: str, -# schedule_days: datetime.timedelta, -# source_endpoint=None, -# check_endpoint=None -# ) -> bool: -# """ -# Schedules a Prefect flow to prune files from a specified location. - -# Args: -# path (str): The file path to the folder containing the files. -# location (str): The server location (e.g., 'alcf832_raw') where the files will be pruned. -# schedule_days (int): The number of days after which the file should be deleted. -# source_endpoint (str): The source endpoint for the files. -# check_endpoint (str): The endpoint to check for the existence of the files. - -# Returns: -# bool: True if the task was scheduled successfully, False otherwise. -# """ -# logger = get_run_logger() - -# try: -# flow_name = f"delete {location}: {Path(path).name}" -# schedule_prefect_flow( -# deployment_name=f"prune_{location}/prune_{location}", -# flow_run_name=flow_name, -# parameters={ -# "relative_path": path, -# "source_endpoint": source_endpoint, -# "check_endpoint": check_endpoint -# }, -# duration_from_now=schedule_days -# ) -# return True -# except Exception as e: -# logger.error(f"Failed to schedule prune task: {e}") -# return False - - -# @task(name="schedule_pruning") -# def schedule_pruning( -# alcf_raw_path: str = None, -# alcf_scratch_path_tiff: str = None, -# alcf_scratch_path_zarr: str = None, -# nersc_scratch_path_tiff: str = None, -# nersc_scratch_path_zarr: str = None, -# data832_raw_path: str = None, -# data832_scratch_path_tiff: str = None, -# data832_scratch_path_zarr: str = None, -# one_minute: bool = False, -# config: Config832 = None -# ) -> bool: -# """ -# This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832. - -# Args: -# alcf_raw_path (str, optional): The raw path of the h5 file on ALCF. -# alcf_scratch_path_tiff (str, optional): The scratch path for TIFF files on ALCF. -# alcf_scratch_path_zarr (str, optional): The scratch path for Zarr files on ALCF. -# nersc_scratch_path_tiff (str, optional): The scratch path for TIFF files on NERSC. -# nersc_scratch_path_zarr (str, optional): The scratch path for Zarr files on NERSC. -# data832_scratch_path (str, optional): The scratch path on data832. -# one_minute (bool, optional): Defaults to False. Whether to schedule the deletion after one minute. -# config (Config832, optional): Configuration object for the flow. - -# Returns: -# bool: True if the tasks were scheduled successfully, False otherwise. -# """ -# logger = get_run_logger() - -# pruning_config = Variable.get("pruning-config", _sync=True) - -# if one_minute: -# alcf_delay = datetime.timedelta(minutes=1) -# nersc_delay = datetime.timedelta(minutes=1) -# data832_delay = datetime.timedelta(minutes=1) -# else: -# alcf_delay = datetime.timedelta(days=pruning_config["delete_alcf832_files_after_days"]) -# nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) -# data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) - -# # (path, location, days, source_endpoint, check_endpoint) -# delete_schedules = [ -# (alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw), -# (alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), -# (alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), -# (nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), -# (nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), -# (data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None), -# (data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None), -# (data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None) -# ] - -# for path, location, days, source_endpoint, check_endpoint in delete_schedules: -# if path: -# schedule_prune_task(path, location, days, source_endpoint, check_endpoint) -# logger.info(f"Scheduled delete from {location} at {days} days") -# else: -# logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.") - -# return True - - @flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{file_path}") def alcf_recon_flow( file_path: str, @@ -659,20 +552,6 @@ def alcf_recon_flow( days_from_now=30.0 ) - # data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success - # schedule_pruning( - # alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - # alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, - # # alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, # Commenting out zarr for now - # nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, - # nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, - # data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - # data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, - # # data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, # Commenting out zarr - # one_minute=False, # Set to False for production durations - # config=config - # ) - # TODO: ingest to scicat if alcf_reconstruction_success and alcf_segmentation_success: # and alcf_multi_res_success: From c9034117f1c3dd7a46cb4d07a70969590c3f96f7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Jan 2026 11:58:37 -0800 Subject: [PATCH 11/21] linting and docstrings --- orchestration/flows/bl832/alcf.py | 69 ++++++++++++++++--------------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 086a3668..57874b38 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -21,14 +21,18 @@ class ALCFTomographyHPCController(TomographyHPCController): There is a @staticmethod wrapper for each compute task submitted via Globus Compute. Also, there is a shared wait_for_globus_compute_future method that waits for the task to complete. - Args: - TomographyHPCController (ABC): Abstract class for tomography HPC controllers. + :param TomographyHPCController: Abstract class for tomography HPC controllers. """ def __init__( self, config: Config832 ) -> None: + """ + Initialize the ALCF Tomography HPC Controller. + + :param config: Configuration object for the controller. + """ super().__init__(config) # Load allocation root from the Prefect JSON block # The block must be registered with the name "alcf-allocation-root-path" @@ -46,11 +50,8 @@ def reconstruct( """ Run tomography reconstruction at ALCF through Globus Compute. - Args: - file_path (str): Path to the file to be processed. - - Returns: - bool: True if the task completed successfully, False otherwise. + :param file_path : Path to the file to be processed. + :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() file_name = Path(file_path).stem + ".h5" @@ -85,14 +86,11 @@ def _reconstruct_wrapper( """ Python function that wraps around the application call for Tomopy reconstruction on ALCF - Args: - rundir (str): the directory on the eagle file system (ALCF) where the input data are located - script_path (str): the path to the script that will run the reconstruction - h5_file_name (str): the name of the h5 file to be reconstructed - folder_path (str): the path to the folder containing the h5 file - - Returns: - str: confirmation message + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will run the reconstruction + :param h5_file_name: the name of the h5 file to be reconstructed + :param folder_path: the path to the folder containing the h5 file + :return: confirmation message """ import os import subprocess @@ -123,11 +121,8 @@ def build_multi_resolution( """ Tiff to Zarr code that is executed using Globus Compute - Args: - file_path (str): Path to the file to be processed. - - Returns: - bool: True if the task completed successfully, False otherwise. + :param file_path: Path to the file to be processed. + :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() @@ -294,14 +289,11 @@ def _wait_for_globus_compute_future( """ Wait for a Globus Compute task to complete, assuming that if future.done() is False, the task is running. - Args: - future: The future object returned from the Globus Compute Executor submit method. - task_name: A descriptive name for the task being executed (used for logging). - check_interval: The interval (in seconds) between status checks. - walltime: The maximum time (in seconds) to wait for the task to complete. - - Returns: - bool: True if the task completed successfully within walltime, False otherwise. + :param future: The future object returned from the Globus Compute Executor submit method. + :param task_name: A descriptive name for the task being executed (used for logging). + :param check_interval: The interval (in seconds) between status checks. + :param walltime: The maximum time (in seconds) to wait for the task to complete. + :return: True if the task completed successfully within walltime, False otherwise. """ logger = get_run_logger() @@ -364,12 +356,9 @@ def alcf_recon_flow( """ Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation. - Args: - file_path (str): The path to the file to be processed. - config (Config832): Configuration object for the flow. - - Returns: - bool: True if the flow completed successfully, False otherwise. + :param file_path: The path to the file to be processed. + :param config: Configuration object for the flow. + :return: True if the flow completed successfully, False otherwise. """ logger = get_run_logger() @@ -565,6 +554,13 @@ def alcf_segmentation_task( recon_folder_path: str, config: Optional[Config832] = None, ): + """ + Run segmentation task at ALCF. + + :param recon_folder_path: Path to the reconstructed data folder to be processed. + :param config: Configuration object for the flow. + :return: True if the task completed successfully, False otherwise. + """ logger = get_run_logger() if config is None: logger.info("No config provided, using default Config832.") @@ -589,6 +585,11 @@ def alcf_segmentation_task( @flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") def alcf_segmentation_integration_test(): + """ + Integration test for the ALCF segmentation task. + + :return: None + """ recon_folder_path = 'rec20211222_125057_petiole4' flow_success = alcf_segmentation_task( recon_folder_path=recon_folder_path, From 5a5cff45614b33fd5b1adde55e03f68089058895 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Jan 2026 12:04:49 -0800 Subject: [PATCH 12/21] Docstrings, linting, and type hints --- orchestration/flows/bl832/alcf.py | 44 ++++++++++++++++++------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 57874b38..ecdb84a2 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -159,13 +159,11 @@ def _build_multi_resolution_wrapper( """ Python function that wraps around the application call for Tiff to Zarr on ALCF - Args: - rundir (str): the directory on the eagle file system (ALCF) where the input data are located - script_path (str): the path to the script that will convert the tiff files to zarr - recon_path (str): the path to the reconstructed data - raw_path (str): the path to the raw data - Returns: - str: confirmation message + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will convert the tiff files to zarr + :param recon_path: the path to the reconstructed data + :param raw_path: the path to the raw data + :return: confirmation message """ import os import subprocess @@ -374,13 +372,14 @@ def alcf_recon_flow( scratch_path_zarr = folder_name + '/rec' + file_name + '.zarr/' # initialize transfer_controller with globus + logger.info("Initializing Globus Transfer Controller.") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) # STEP 1: Transfer data from data832 to ALCF - logger.info("Copying data to ALCF.") + logger.info("Copying raw data to ALCF.") data832_raw_path = f"{folder_name}/{h5_file_name}" alcf_transfer_success = transfer_controller.copy( file_path=data832_raw_path, @@ -395,14 +394,16 @@ def alcf_recon_flow( else: logger.info("Transfer to ALCF Successful.") - # STEP 2A: Run the Tomopy Reconstruction Globus Flow + # STEP 2: Run the Tomopy Reconstruction Globus Flow logger.info(f"Starting ALCF reconstruction flow for {file_path=}") # Initialize the Tomography Controller and run the reconstruction + logger.info("Initializing ALCF Tomography HPC Controller.") tomography_controller = get_controller( hpc_type=HPC.ALCF, config=config ) + logger.info(f"Starting ALCF reconstruction task for {file_path=}") alcf_reconstruction_success = tomography_controller.reconstruct( file_path=file_path, ) @@ -412,7 +413,7 @@ def alcf_recon_flow( else: logger.info("Reconstruction Successful.") - # Transfer A: Send reconstructed data (tiff) to data832 + # STEP 3: Send reconstructed data (tiff) to data832 logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " f"at ALCF to {config.data832_scratch} at data832") data832_tiff_transfer_success = transfer_controller.copy( @@ -422,7 +423,7 @@ def alcf_recon_flow( ) logger.info(f"Transfer reconstructed TIFF data to data832 success: {data832_tiff_transfer_success}") - # STEP 3: Run the Segmentation Task at ALCF + # STEP 4: Run the Segmentation Task at ALCF logger.info(f"Starting ALCF segmentation task for {scratch_path_tiff=}") alcf_segmentation_success = alcf_segmentation_task( recon_folder_path=scratch_path_tiff, @@ -432,6 +433,10 @@ def alcf_recon_flow( logger.warning("Segmentation at ALCF Failed") else: logger.info("Segmentation at ALCF Successful") + + # STEP 5: Send segmented data to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_segment} " + f"at ALCF to {config.data832_scratch} at data832") segment_transfer_success = transfer_controller.copy( file_path=scratch_path_segment, source=config.alcf832_synaps_segment, @@ -442,7 +447,7 @@ def alcf_recon_flow( # Not running TIFF to Zarr conversion at ALCF for now alcf_multi_res_success = False data832_zarr_transfer_success = False - # STEP 2B: Run the Tiff to Zarr Globus Flow + # STEP 6: Run the Tiff to Zarr Globus Flow # logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") # alcf_multi_res_success = tomography_controller.build_multi_resolution( # file_path=file_path, @@ -452,7 +457,7 @@ def alcf_recon_flow( # raise ValueError("Tiff to Zarr at ALCF Failed") # else: # logger.info("Tiff to Zarr Successful.") - # # Transfer B: Send reconstructed data (zarr) to data832 + # # STEP 7: Send reconstructed data (zarr) to data832 # logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " # f"at ALCF to {config.data832_scratch} at data832") # data832_zarr_transfer_success = transfer_controller.copy( @@ -464,7 +469,7 @@ def alcf_recon_flow( # Place holder in case we want to transfer to NERSC for long term storage # nersc_transfer_success = False - # STEP 4: Schedule Pruning of files + # STEP 8: Schedule Pruning of files logger.info("Scheduling file pruning tasks.") prune_controller = get_prune_controller( prune_type=PruneMethod.GLOBUS, @@ -553,7 +558,7 @@ def alcf_recon_flow( def alcf_segmentation_task( recon_folder_path: str, config: Optional[Config832] = None, -): +) -> bool: """ Run segmentation task at ALCF. @@ -584,18 +589,21 @@ def alcf_segmentation_task( @flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") -def alcf_segmentation_integration_test(): +def alcf_segmentation_integration_test() -> bool: """ Integration test for the ALCF segmentation task. - :return: None + :return: True if the segmentation task completed successfully, False otherwise. """ + logger = get_run_logger() + logger.info("Starting ALCF segmentation integration test.") recon_folder_path = 'rec20211222_125057_petiole4' flow_success = alcf_segmentation_task( recon_folder_path=recon_folder_path, config=Config832() ) - print(flow_success) + logger.info(f"Flow success: {flow_success}") + return flow_success if __name__ == "__main__": From b5e0ba9b7800ae7335455406b25850596ba8b9a0 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 26 Jan 2026 12:05:42 -0800 Subject: [PATCH 13/21] Updating globus compute config for segmentation --- .../globus_compute_segment_config.yaml | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/scripts/polaris/globus_compute_segment_config.yaml b/scripts/polaris/globus_compute_segment_config.yaml index 07bced00..15f150ea 100644 --- a/scripts/polaris/globus_compute_segment_config.yaml +++ b/scripts/polaris/globus_compute_segment_config.yaml @@ -1,9 +1,8 @@ -# This needs to be updated to use GPUs and a segmentation environment - engine: type: GlobusComputeEngine # This engine uses the HighThroughputExecutor max_retries_on_system_failure: 2 max_workers: 1 # Sets one worker per node + max_workers_per_node: 4 prefetch_capacity: 0 # Increase if you have many more tasks than workers address: @@ -25,16 +24,24 @@ engine: overrides: --depth=64 --ppn 1 account: SYNAPS-I - queue: debug - cpus_per_node: 64 + queue: debug # debug (1-2 nodes), debug-scaling (1-10 nodes), or some other queue, probably want demand (1-56 nodes) for real-time things, prod (496 nodes) + # minimum node 1, max 56 nodes. Max time 59 minutes + cpus_per_node: 32 # may want to change to 4 (only 4 GPUs per node) # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" scheduler_options: "#PBS -l filesystems=home:eagle" - # Node setup: activate necessary conda environment and such - worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" - - walltime: 00:60:00 # Jobs will end after 60 minutes + # worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/segmentation/env/; export PATH=$PATH:/eagle/SYNAPS-I/; cd $HOME/.globus_compute/globus_compute_segmentation" + worker_init: | + module use /soft/modulefiles + module load conda + conda activate base + source /eagle/SYNAPS-I/segmentation/env/bin/activate + export HF_HUB_CACHE=/eagle/SYNAPS-I/segmentation/.cache/huggingface + export HF_HOME=$HF_HUB_CACHE + cd /eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo + + walltime: 59:00 # Jobs will end after 59 minutes nodes_per_block: 2 # All jobs will have 1 node init_blocks: 0 min_blocks: 0 From 54dab5df4239935c71c9e24a0ba236e8f3fe09b4 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 28 Jan 2026 11:04:24 -0800 Subject: [PATCH 14/21] turning ALCF recon+segmentation into a separate flow from recon+zarr conversion --- orchestration/flows/bl832/alcf.py | 213 +++++++++++++++++++++++------- 1 file changed, 165 insertions(+), 48 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index ecdb84a2..9b8bdb28 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -368,7 +368,6 @@ def alcf_recon_flow( file_name = path.stem h5_file_name = file_name + '.h5' scratch_path_tiff = folder_name + '/rec' + file_name + '/' - scratch_path_segment = folder_name + '/seg' + file_name + '/' scratch_path_zarr = folder_name + '/rec' + file_name + '.zarr/' # initialize transfer_controller with globus @@ -388,6 +387,169 @@ def alcf_recon_flow( ) logger.info(f"Transfer status: {alcf_transfer_success}") + if not alcf_transfer_success: + logger.error("Transfer failed due to configuration or authorization issues.") + raise ValueError("Transfer to ALCF Failed") + else: + logger.info("Transfer to ALCF Successful.") + + # STEP 2: Run Tomopy Reconstruction on Globus Compute + logger.info(f"Starting ALCF reconstruction flow for {file_path=}") + + # Initialize the Tomography Controller and run the reconstruction + logger.info("Initializing ALCF Tomography HPC Controller.") + tomography_controller = get_controller( + hpc_type=HPC.ALCF, + config=config + ) + logger.info(f"Starting ALCF reconstruction task for {file_path=}") + alcf_reconstruction_success = tomography_controller.reconstruct( + file_path=file_path, + ) + if not alcf_reconstruction_success: + logger.error("Reconstruction Failed.") + raise ValueError("Reconstruction at ALCF Failed") + else: + logger.info("Reconstruction Successful.") + + # STEP 3: Send reconstructed data (tiff) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " + f"at ALCF to {config.data832_scratch} at data832") + data832_tiff_transfer_success = transfer_controller.copy( + file_path=scratch_path_tiff, + source=config.alcf832_synaps_recon, + destination=config.data832_scratch + ) + logger.info(f"Transfer reconstructed TIFF data to data832 success: {data832_tiff_transfer_success}") + + # STEP 4: Run the Tiff to Zarr Globus Flow + logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") + alcf_multi_res_success = tomography_controller.build_multi_resolution( + file_path=file_path, + ) + if not alcf_multi_res_success: + logger.error("Tiff to Zarr Failed.") + raise ValueError("Tiff to Zarr at ALCF Failed") + else: + logger.info("Tiff to Zarr Successful.") + # STEP 5: Send reconstructed data (zarr) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + f"at ALCF to {config.data832_scratch} at data832") + data832_zarr_transfer_success = transfer_controller.copy( + file_path=scratch_path_zarr, + source=config.alcf832_scratch, + destination=config.data832_scratch + ) + + # Place holder in case we want to transfer to NERSC for long term storage + # nersc_transfer_success = False + + # STEP 6: Schedule Pruning of files + logger.info("Scheduling file pruning tasks.") + prune_controller = get_prune_controller( + prune_type=PruneMethod.GLOBUS, + config=config + ) + + # Prune from ALCF raw + if alcf_transfer_success: + logger.info("Scheduling pruning of ALCF raw data.") + prune_controller.prune( + file_path=data832_raw_path, + source_endpoint=config.alcf832_synaps_raw, + check_endpoint=None, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/reconstruction + if alcf_reconstruction_success: + logger.info("Scheduling pruning of ALCF scratch reconstruction data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune ZARR from ALCF scratch/reconstruction + if alcf_multi_res_success: + logger.info("Scheduling pruning of ALCF scratch zarr reconstruction data.") + prune_controller.prune( + file_path=scratch_path_zarr, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune reconstructed TIFFs from data832 scratch + if data832_tiff_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction TIFF data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # Prune reconstructed ZARR from data832 scratch + if data832_zarr_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction ZARR data.") + prune_controller.prune( + file_path=scratch_path_zarr, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # TODO: ingest to scicat + + if alcf_reconstruction_success and alcf_multi_res_success: + return True + else: + return False + + +@flow(name="forge_alcf_recon_segment_flow", flow_run_name="alcf_recon_seg-{file_path}") +def forge_alcf_recon_segment_flow( + file_path: str, + config: Optional[Config832] = None, +) -> bool: + """ + Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation. + + :param file_path: The path to the file to be processed. + :param config: Configuration object for the flow. + :return: True if the flow completed successfully, False otherwise. + """ + logger = get_run_logger() + + if config is None: + config = Config832() + # set up file paths + path = Path(file_path) + folder_name = path.parent.name + file_name = path.stem + h5_file_name = file_name + '.h5' + scratch_path_tiff = folder_name + '/rec' + file_name + '/' + scratch_path_segment = folder_name + '/seg' + file_name + '/' + + # initialize transfer_controller with globus + logger.info("Initializing Globus Transfer Controller.") + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + # STEP 1: Transfer data from data832 to ALCF + logger.info("Copying raw data to ALCF.") + data832_raw_path = f"{folder_name}/{h5_file_name}" + alcf_transfer_success = transfer_controller.copy( + file_path=data832_raw_path, + source=config.data832_raw, + destination=config.alcf832_synaps_raw + ) + logger.info(f"Transfer status: {alcf_transfer_success}") + if not alcf_transfer_success: logger.error("Transfer failed due to configuration or authorization issues.") raise ValueError("Transfer to ALCF Failed") @@ -444,32 +606,7 @@ def alcf_recon_flow( ) logger.info(f"Transfer segmented data to data832 success: {segment_transfer_success}") - # Not running TIFF to Zarr conversion at ALCF for now - alcf_multi_res_success = False - data832_zarr_transfer_success = False - # STEP 6: Run the Tiff to Zarr Globus Flow - # logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") - # alcf_multi_res_success = tomography_controller.build_multi_resolution( - # file_path=file_path, - # ) - # if not alcf_multi_res_success: - # logger.error("Tiff to Zarr Failed.") - # raise ValueError("Tiff to Zarr at ALCF Failed") - # else: - # logger.info("Tiff to Zarr Successful.") - # # STEP 7: Send reconstructed data (zarr) to data832 - # logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " - # f"at ALCF to {config.data832_scratch} at data832") - # data832_zarr_transfer_success = transfer_controller.copy( - # file_path=scratch_path_zarr, - # source=config.alcf832_scratch, - # destination=config.data832_scratch - # ) - - # Place holder in case we want to transfer to NERSC for long term storage - # nersc_transfer_success = False - - # STEP 8: Schedule Pruning of files + # STEP 6: Schedule Pruning of files logger.info("Scheduling file pruning tasks.") prune_controller = get_prune_controller( prune_type=PruneMethod.GLOBUS, @@ -506,16 +643,6 @@ def alcf_recon_flow( days_from_now=2.0 ) - # Prune ZARR from ALCF scratch/reconstruction - if alcf_multi_res_success: - logger.info("Scheduling pruning of ALCF scratch zarr reconstruction data.") - prune_controller.prune( - file_path=scratch_path_zarr, - source_endpoint=config.alcf832_synaps_recon, - check_endpoint=config.data832_scratch, - days_from_now=2.0 - ) - # Prune reconstructed TIFFs from data832 scratch if data832_tiff_transfer_success: logger.info("Scheduling pruning of data832 scratch reconstruction TIFF data.") @@ -526,16 +653,6 @@ def alcf_recon_flow( days_from_now=30.0 ) - # Prune reconstructed ZARR from data832 scratch - if data832_zarr_transfer_success: - logger.info("Scheduling pruning of data832 scratch reconstruction ZARR data.") - prune_controller.prune( - file_path=scratch_path_zarr, - source_endpoint=config.data832_scratch, - check_endpoint=None, - days_from_now=30.0 - ) - # Prune segmented data from data832 scratch if alcf_segmentation_success: logger.info("Scheduling pruning of data832 scratch segmentation data.") @@ -548,7 +665,7 @@ def alcf_recon_flow( # TODO: ingest to scicat - if alcf_reconstruction_success and alcf_segmentation_success: # and alcf_multi_res_success: + if alcf_reconstruction_success and alcf_segmentation_success: return True else: return False From 6361a332a792f2e1718a8ae8c87fddd40f624b6a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 28 Jan 2026 11:22:42 -0800 Subject: [PATCH 15/21] updating pytest for alcf reconstruction --- orchestration/_tests/test_globus_flow.py | 39 +++++++++++++----------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index 4e424bad..6459815e 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -147,8 +147,8 @@ def __init__(self) -> None: MockSecret.for_endpoint("nersc832_alsdev_raw")), "nersc832_alsdev_scratch": MockEndpoint("mock_nersc832_alsdev_scratch_path", MockSecret.for_endpoint("nersc832_alsdev_scratch")), - "alcf832_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_raw")), - "alcf832_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_scratch")), + "alcf832_iri_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_iri_raw")), + "alcf832_iri_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_iri_scratch")), } # Mock apps @@ -163,8 +163,8 @@ def __init__(self) -> None: self.spot832 = self.endpoints["spot832"] self.data832 = self.endpoints["data832"] self.nersc832 = self.endpoints["nersc832"] - self.alcf832_raw = self.endpoints["alcf832_raw"] - self.alcf832_scratch = self.endpoints["alcf832_scratch"] + self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] + self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] self.data832_raw = self.endpoints["data832_raw"] self.data832_scratch = self.endpoints["data832_scratch"] self.nersc832_alsdev_scratch = self.endpoints["nersc832_alsdev_scratch"] @@ -247,8 +247,11 @@ def test_alcf_recon_flow(mocker: MockFixture): "nersc832_alsdev_pscratch_raw": mocker.MagicMock(), "nersc832_alsdev_pscratch_scratch": mocker.MagicMock(), "nersc832_alsdev_recon_scripts": mocker.MagicMock(), - "alcf832_raw": mocker.MagicMock(), - "alcf832_scratch": mocker.MagicMock(), + "alcf832_iri_raw": mocker.MagicMock(), + "alcf832_iri_scratch": mocker.MagicMock(), + "alcf832_synaps_raw": mocker.MagicMock(), + "alcf832_synaps_recon": mocker.MagicMock(), + "alcf832_synaps_segment": mocker.MagicMock(), } ) mocker.patch( @@ -298,10 +301,12 @@ def test_alcf_recon_flow(mocker: MockFixture): return_value=mock_transfer_controller ) - # 7) Patch schedule_pruning => skip real scheduling - mock_schedule_pruning = mocker.patch( - "orchestration.flows.bl832.alcf.schedule_pruning", - return_value=True + # 7) Patch get_prune_controller(...) => skip real scheduling + mock_prune_controller = mocker.MagicMock() + mock_prune_controller.prune.return_value = True + mocker.patch( + "orchestration.flows.bl832.alcf.get_prune_controller", + return_value=mock_prune_controller ) file_path = "/global/raw/transfer_tests/test.h5" @@ -316,13 +321,13 @@ def test_alcf_recon_flow(mocker: MockFixture): assert mock_transfer_controller.copy.call_count == 3, "Should do 3 transfers in success path" mock_hpc_reconstruct.assert_called_once() mock_hpc_multires.assert_called_once() - mock_schedule_pruning.assert_called_once() + assert mock_prune_controller.prune.call_count == 5, "Should schedule 5 prune operations in success path" # Reset for next scenario mock_transfer_controller.copy.reset_mock() mock_hpc_reconstruct.reset_mock() mock_hpc_multires.reset_mock() - mock_schedule_pruning.reset_mock() + mock_prune_controller.prune.reset_mock() # # ---------- CASE 2: HPC reconstruction fails ---------- @@ -339,13 +344,13 @@ def test_alcf_recon_flow(mocker: MockFixture): assert mock_transfer_controller.copy.call_count == 1, ( "Should only do the first data832->alcf copy before HPC fails" ) - mock_schedule_pruning.assert_not_called() + mock_prune_controller.prune.assert_not_called() # Reset mock_transfer_controller.copy.reset_mock() mock_hpc_reconstruct.reset_mock() mock_hpc_multires.reset_mock() - mock_schedule_pruning.reset_mock() + mock_prune_controller.prune.reset_mock() # ---------- CASE 3: Tiff->Zarr fails ---------- mock_transfer_controller.copy.return_value = True @@ -360,13 +365,13 @@ def test_alcf_recon_flow(mocker: MockFixture): # HPC is done, so there's 2 successful transfer (data832->alcf). # We have not transferred tiff or zarr => total 2 copies assert mock_transfer_controller.copy.call_count == 2 - mock_schedule_pruning.assert_not_called() + mock_prune_controller.prune.assert_not_called() # Reset mock_transfer_controller.copy.reset_mock() mock_hpc_reconstruct.reset_mock() mock_hpc_multires.reset_mock() - mock_schedule_pruning.reset_mock() + mock_prune_controller.prune.reset_mock() # ---------- CASE 4: data832->ALCF fails immediately ---------- mock_transfer_controller.copy.return_value = False @@ -380,4 +385,4 @@ def test_alcf_recon_flow(mocker: MockFixture): mock_hpc_multires.assert_not_called() # The only call is the failing copy mock_transfer_controller.copy.assert_called_once() - mock_schedule_pruning.assert_not_called() + mock_prune_controller.prune.assert_not_called() From f96c5cd7243c0885a1624b9783ec79e3a6e480b7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 28 Jan 2026 11:22:56 -0800 Subject: [PATCH 16/21] Adjusting endpoint names for synaps --- orchestration/flows/bl832/alcf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 9b8bdb28..84b22692 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -433,11 +433,11 @@ def alcf_recon_flow( else: logger.info("Tiff to Zarr Successful.") # STEP 5: Send reconstructed data (zarr) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " f"at ALCF to {config.data832_scratch} at data832") data832_zarr_transfer_success = transfer_controller.copy( file_path=scratch_path_zarr, - source=config.alcf832_scratch, + source=config.alcf832_synaps_recon, destination=config.data832_scratch ) From 8324b7d8b4a6554b42be46e36c9ad31d0a2da7c3 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 28 Jan 2026 11:44:35 -0800 Subject: [PATCH 17/21] adding the alcf_forge_recon_segment flow to prefect.yaml as a separate deployment --- orchestration/flows/bl832/alcf.py | 4 ++-- orchestration/flows/bl832/prefect.yaml | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 84b22692..da864c83 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -509,8 +509,8 @@ def alcf_recon_flow( return False -@flow(name="forge_alcf_recon_segment_flow", flow_run_name="alcf_recon_seg-{file_path}") -def forge_alcf_recon_segment_flow( +@flow(name="alcf_forge_recon_segment_flow", flow_run_name="alcf_recon_seg-{file_path}") +def alcf_forge_recon_segment_flow( file_path: str, config: Optional[Config832] = None, ) -> bool: diff --git a/orchestration/flows/bl832/prefect.yaml b/orchestration/flows/bl832/prefect.yaml index a1d4613b..20858610 100644 --- a/orchestration/flows/bl832/prefect.yaml +++ b/orchestration/flows/bl832/prefect.yaml @@ -55,6 +55,12 @@ deployments: name: alcf_recon_flow_pool work_queue_name: alcf_recon_flow_queue +- alcf_forge_recon_segment_flow: + entrypoint: orchestration/flows/bl832/alcf.py:alcf_forge_recon_segment_flow + work_pool: + name: alcf_recon_flow_pool + work_queue_name: alcf_forge_recon_segment_flow_queue + # Pruning flows - name: prune_globus_endpoint entrypoint: orchestration/prune_controller.py:prune_globus_endpoint From 7599f2e6a55d3b2807d0a60a306383b7096a70f1 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 28 Jan 2026 11:45:02 -0800 Subject: [PATCH 18/21] updating bl832 dispatcher to include alcf_forge_recon_segment as a separate option --- orchestration/flows/bl832/dispatcher.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index cf1d0c64..7c799c2b 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -17,6 +17,9 @@ class FlowParameterMapper: "alcf_recon_flow/alcf_recon_flow": [ "file_path", "config"], + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": [ + "file_path", + "config"], # From move.py "new_832_file_flow/new_file_832": [ "file_path", @@ -55,22 +58,26 @@ class DecisionFlowInputModel(BaseModel): @task(name="setup_decision_settings") -def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: bool) -> dict: +def setup_decision_settings(alcf_recon: bool, alcf_forge_recon_segment: bool, nersc_recon: bool, new_file_832: bool) -> dict: """ This task is used to define the settings for the decision making process of the BL832 beamline. :param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow. + :param alcf_forge_recon_segment: Boolean indicating whether to run the ALCF forge reconstruction segmentation flow. :param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow. - :param nersc_move: Boolean indicating whether to move files to NERSC. + :param new_file_832: Boolean indicating whether to run the new file 832 flow. :return: A dictionary containing the settings for each flow. """ logger = get_run_logger() try: logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, " - f"nersc_recon={nersc_recon}, new_file_832={new_file_832}") + f"alcf_forge_recon_segment={alcf_forge_recon_segment}, " + f"nersc_recon={nersc_recon}, " + f"new_file_832={new_file_832}") # Define which flows to run based on the input settings settings = { "alcf_recon_flow/alcf_recon_flow": alcf_recon, + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment, "nersc_recon_flow/nersc_recon_flow": nersc_recon, "new_832_file_flow/new_file_832": new_file_832 } @@ -145,6 +152,13 @@ async def dispatcher( alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) + if decision_settings.get("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow"): + alcf_forge_params = FlowParameterMapper.get_flow_parameters( + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", + available_params + ) + tasks.append(run_recon_flow_async("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", alcf_forge_params)) + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) @@ -169,7 +183,7 @@ async def dispatcher( """ try: # Setup decision settings based on input parameters - setup_decision_settings(alcf_recon=True, nersc_recon=True, new_file_832=True) + setup_decision_settings(alcf_recon=True, alcf_forge_recon_segment=False, nersc_recon=True, new_file_832=True) # Run the main decision flow with the specified parameters # asyncio.run(dispatcher( # config={}, # PYTEST, ALCF, NERSC From d78c98db377c4535878ca68a910f664d555c4312 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 2 Feb 2026 10:56:57 -0800 Subject: [PATCH 19/21] adding transfer client uuid for ALCF SYNAPS-I --- config.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/config.yml b/config.yml index b4d29a5d..f224ce14 100644 --- a/config.yml +++ b/config.yml @@ -76,19 +76,19 @@ globus: alcf832_synaps_raw: root_path: /data/bl832/raw uri: alcf.anl.gov - uuid: TBD + uuid: 728a8e30-32ef-4000-814c-f9ccbc00bf13 name: alcf832_synaps_raw alcf832_synaps_recon: root_path: /data/bl832/scratch/reconstruction/ uri: alcf.anl.gov - uuid: TBD + uuid: 728a8e30-32ef-4000-814c-f9ccbc00bf13 name: alcf832_synaps_recon alcf832_synaps_segment: root_path: /data/bl832/scratch/segmentation/ uri: alcf.anl.gov - uuid: TBD + uuid: 728a8e30-32ef-4000-814c-f9ccbc00bf13 name: alcf832_synaps_segment alcf832_iri_raw: From 3c0f25eaaaaf9ea54cb0b956e26c40cdbce5f8e8 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 2 Feb 2026 10:57:20 -0800 Subject: [PATCH 20/21] this configuration worked for launching segmentation on 1 GPU --- orchestration/flows/bl832/alcf.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index da864c83..ac122f30 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -221,7 +221,7 @@ def _segmentation_wrapper( output_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/", script_module: str = "src.inference", workdir: str = "/eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo", - nproc_per_node: int = 4, + nproc_per_node: int = 4, # 1 works nnodes: int = 1, nnode_rank: int = 0, master_addr: str = "localhost", @@ -267,7 +267,10 @@ def _segmentation_wrapper( "--prompts", *prompts, ] - segment_res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + segment_res = subprocess.run(command) # stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + if segment_res.returncode != 0: + raise RuntimeError(f"Segmentation failed with return code {segment_res.returncode}") seg_end = time.time() @@ -282,7 +285,7 @@ def _wait_for_globus_compute_future( future: Future, task_name: str, check_interval: int = 20, - walltime: int = 1200 # seconds = 20 minutes + walltime: int = 3600 # seconds = 60 minutes ) -> bool: """ Wait for a Globus Compute task to complete, assuming that if future.done() is False, the task is running. @@ -714,7 +717,7 @@ def alcf_segmentation_integration_test() -> bool: """ logger = get_run_logger() logger.info("Starting ALCF segmentation integration test.") - recon_folder_path = 'rec20211222_125057_petiole4' + recon_folder_path = 'test' # 'rec20211222_125057_petiole4' flow_success = alcf_segmentation_task( recon_folder_path=recon_folder_path, config=Config832() From e6ebd1f609cc35047f590e1f99af50dee8f0bc38 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 2 Feb 2026 10:57:39 -0800 Subject: [PATCH 21/21] Updating segmentation compute endpoint config --- scripts/polaris/globus_compute_segment_config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/polaris/globus_compute_segment_config.yaml b/scripts/polaris/globus_compute_segment_config.yaml index 15f150ea..6479e84f 100644 --- a/scripts/polaris/globus_compute_segment_config.yaml +++ b/scripts/polaris/globus_compute_segment_config.yaml @@ -1,7 +1,7 @@ engine: type: GlobusComputeEngine # This engine uses the HighThroughputExecutor max_retries_on_system_failure: 2 - max_workers: 1 # Sets one worker per node + # max_workers: 1 # Sets one worker per node max_workers_per_node: 4 prefetch_capacity: 0 # Increase if you have many more tasks than workers @@ -29,7 +29,7 @@ engine: cpus_per_node: 32 # may want to change to 4 (only 4 GPUs per node) # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" - scheduler_options: "#PBS -l filesystems=home:eagle" + scheduler_options: "#PBS -l filesystems=home:eagle -l select=1:ngpus=4" # Node setup: activate necessary conda environment and such # worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/segmentation/env/; export PATH=$PATH:/eagle/SYNAPS-I/; cd $HOME/.globus_compute/globus_compute_segmentation" worker_init: |