diff --git a/config.yml b/config.yml index 3f26a4f0..f224ce14 100644 --- a/config.yml +++ b/config.yml @@ -46,6 +46,7 @@ globus: uri: beegfs.als.lbl.gov uuid: d33b5d6e-1603-414e-93cb-bcb732b7914a name: bl733-beegfs-data + # 8.3.2 ENDPOINTS spot832: @@ -72,17 +73,35 @@ globus: uuid: 75b478b2-37af-46df-bfbd-71ed692c6506 name: data832_scratch - alcf832_raw: + alcf832_synaps_raw: + root_path: /data/bl832/raw + uri: alcf.anl.gov + uuid: 728a8e30-32ef-4000-814c-f9ccbc00bf13 + name: alcf832_synaps_raw + + alcf832_synaps_recon: + root_path: /data/bl832/scratch/reconstruction/ + uri: alcf.anl.gov + uuid: 728a8e30-32ef-4000-814c-f9ccbc00bf13 + name: alcf832_synaps_recon + + alcf832_synaps_segment: + root_path: /data/bl832/scratch/segmentation/ + uri: alcf.anl.gov + uuid: 728a8e30-32ef-4000-814c-f9ccbc00bf13 + name: alcf832_synaps_segment + + alcf832_iri_raw: root_path: /data/raw uri: alcf.anl.gov uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 - name: alcf_raw + name: alcf_iri_raw - alcf832_scratch: + alcf832_iri_scratch: root_path: /data/scratch uri: alcf.anl.gov uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 - name: alcf_scratch + name: alcf_iri_scratch alcf_eagle832: root_path: /IRIBeta/als/example diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index 4e424bad..6459815e 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -147,8 +147,8 @@ def __init__(self) -> None: MockSecret.for_endpoint("nersc832_alsdev_raw")), "nersc832_alsdev_scratch": MockEndpoint("mock_nersc832_alsdev_scratch_path", MockSecret.for_endpoint("nersc832_alsdev_scratch")), - "alcf832_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_raw")), - "alcf832_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_scratch")), + "alcf832_iri_raw": MockEndpoint("mock_alcf832_raw_path", MockSecret.for_endpoint("alcf832_iri_raw")), + "alcf832_iri_scratch": MockEndpoint("mock_alcf832_scratch_path", MockSecret.for_endpoint("alcf832_iri_scratch")), } # Mock apps @@ -163,8 +163,8 @@ def __init__(self) -> None: self.spot832 = self.endpoints["spot832"] self.data832 = self.endpoints["data832"] self.nersc832 = self.endpoints["nersc832"] - self.alcf832_raw = self.endpoints["alcf832_raw"] - self.alcf832_scratch = self.endpoints["alcf832_scratch"] + self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] + self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] self.data832_raw = self.endpoints["data832_raw"] self.data832_scratch = self.endpoints["data832_scratch"] self.nersc832_alsdev_scratch = self.endpoints["nersc832_alsdev_scratch"] @@ -247,8 +247,11 @@ def test_alcf_recon_flow(mocker: MockFixture): "nersc832_alsdev_pscratch_raw": mocker.MagicMock(), "nersc832_alsdev_pscratch_scratch": mocker.MagicMock(), "nersc832_alsdev_recon_scripts": mocker.MagicMock(), - "alcf832_raw": mocker.MagicMock(), - "alcf832_scratch": mocker.MagicMock(), + "alcf832_iri_raw": mocker.MagicMock(), + "alcf832_iri_scratch": mocker.MagicMock(), + "alcf832_synaps_raw": mocker.MagicMock(), + "alcf832_synaps_recon": mocker.MagicMock(), + "alcf832_synaps_segment": mocker.MagicMock(), } ) mocker.patch( @@ -298,10 +301,12 @@ def test_alcf_recon_flow(mocker: MockFixture): return_value=mock_transfer_controller ) - # 7) Patch schedule_pruning => skip real scheduling - mock_schedule_pruning = mocker.patch( - "orchestration.flows.bl832.alcf.schedule_pruning", - return_value=True + # 7) Patch get_prune_controller(...) => skip real scheduling + mock_prune_controller = mocker.MagicMock() + mock_prune_controller.prune.return_value = True + mocker.patch( + "orchestration.flows.bl832.alcf.get_prune_controller", + return_value=mock_prune_controller ) file_path = "/global/raw/transfer_tests/test.h5" @@ -316,13 +321,13 @@ def test_alcf_recon_flow(mocker: MockFixture): assert mock_transfer_controller.copy.call_count == 3, "Should do 3 transfers in success path" mock_hpc_reconstruct.assert_called_once() mock_hpc_multires.assert_called_once() - mock_schedule_pruning.assert_called_once() + assert mock_prune_controller.prune.call_count == 5, "Should schedule 5 prune operations in success path" # Reset for next scenario mock_transfer_controller.copy.reset_mock() mock_hpc_reconstruct.reset_mock() mock_hpc_multires.reset_mock() - mock_schedule_pruning.reset_mock() + mock_prune_controller.prune.reset_mock() # # ---------- CASE 2: HPC reconstruction fails ---------- @@ -339,13 +344,13 @@ def test_alcf_recon_flow(mocker: MockFixture): assert mock_transfer_controller.copy.call_count == 1, ( "Should only do the first data832->alcf copy before HPC fails" ) - mock_schedule_pruning.assert_not_called() + mock_prune_controller.prune.assert_not_called() # Reset mock_transfer_controller.copy.reset_mock() mock_hpc_reconstruct.reset_mock() mock_hpc_multires.reset_mock() - mock_schedule_pruning.reset_mock() + mock_prune_controller.prune.reset_mock() # ---------- CASE 3: Tiff->Zarr fails ---------- mock_transfer_controller.copy.return_value = True @@ -360,13 +365,13 @@ def test_alcf_recon_flow(mocker: MockFixture): # HPC is done, so there's 2 successful transfer (data832->alcf). # We have not transferred tiff or zarr => total 2 copies assert mock_transfer_controller.copy.call_count == 2 - mock_schedule_pruning.assert_not_called() + mock_prune_controller.prune.assert_not_called() # Reset mock_transfer_controller.copy.reset_mock() mock_hpc_reconstruct.reset_mock() mock_hpc_multires.reset_mock() - mock_schedule_pruning.reset_mock() + mock_prune_controller.prune.reset_mock() # ---------- CASE 4: data832->ALCF fails immediately ---------- mock_transfer_controller.copy.return_value = False @@ -380,4 +385,4 @@ def test_alcf_recon_flow(mocker: MockFixture): mock_hpc_multires.assert_not_called() # The only call is the failing copy mock_transfer_controller.copy.assert_called_once() - mock_schedule_pruning.assert_not_called() + mock_prune_controller.prune.assert_not_called() diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index bdf96ac2..ac122f30 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -1,5 +1,4 @@ from concurrent.futures import Future -import datetime from pathlib import Path import time from typing import Optional @@ -12,8 +11,8 @@ from orchestration.flows.bl832.config import Config832 from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController +from orchestration.prune_controller import get_prune_controller, PruneMethod from orchestration.transfer_controller import get_transfer_controller, CopyMethod -from orchestration.prefect import schedule_prefect_flow class ALCFTomographyHPCController(TomographyHPCController): @@ -22,20 +21,24 @@ class ALCFTomographyHPCController(TomographyHPCController): There is a @staticmethod wrapper for each compute task submitted via Globus Compute. Also, there is a shared wait_for_globus_compute_future method that waits for the task to complete. - Args: - TomographyHPCController (ABC): Abstract class for tomography HPC controllers. + :param TomographyHPCController: Abstract class for tomography HPC controllers. """ def __init__( self, config: Config832 ) -> None: + """ + Initialize the ALCF Tomography HPC Controller. + + :param config: Configuration object for the controller. + """ super().__init__(config) # Load allocation root from the Prefect JSON block # The block must be registered with the name "alcf-allocation-root-path" logger = get_run_logger() allocation_data = Variable.get("alcf-allocation-root-path", _sync=True) - self.allocation_root = allocation_data.get("alcf-allocation-root-path") + self.allocation_root = allocation_data.get("alcf-allocation-root-path") # eagle/SYNAPS-I/ if not self.allocation_root: raise ValueError("Allocation root not found in JSON block 'alcf-allocation-root-path'") logger.info(f"Allocation root loaded: {self.allocation_root}") @@ -47,27 +50,26 @@ def reconstruct( """ Run tomography reconstruction at ALCF through Globus Compute. - Args: - file_path (str): Path to the file to be processed. - - Returns: - bool: True if the task completed successfully, False otherwise. + :param file_path : Path to the file to be processed. + :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() file_name = Path(file_path).stem + ".h5" folder_name = Path(file_path).parent.name - iri_als_bl832_rundir = f"{self.allocation_root}/data/raw" - iri_als_bl832_recon_script = f"{self.allocation_root}/scripts/globus_reconstruction.py" + rundir = f"{self.allocation_root}/data/bl832/raw" + recon_script = f"{self.allocation_root}/reconstruction/scripts/globus_reconstruction.py" gcc = Client(code_serialization_strategy=CombinedCode()) + # TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID + # We will probably have 2 endpoints, one for recon, one for segmentation with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF") future = fxe.submit( self._reconstruct_wrapper, - iri_als_bl832_rundir, - iri_als_bl832_recon_script, + rundir, + recon_script, file_name, folder_name ) @@ -76,22 +78,19 @@ def reconstruct( @staticmethod def _reconstruct_wrapper( - rundir: str = "/eagle/IRIProd/ALS/data/raw", - script_path: str = "/eagle/IRIProd/ALS/scripts/globus_reconstruction.py", + rundir: str = "/eagle/SYNAPS-I/data/bl832/raw", + script_path: str = "/eagle/SYNAPS-I/reconstruction/scripts/globus_reconstruction.py", h5_file_name: str = None, folder_path: str = None ) -> str: """ Python function that wraps around the application call for Tomopy reconstruction on ALCF - Args: - rundir (str): the directory on the eagle file system (ALCF) where the input data are located - script_path (str): the path to the script that will run the reconstruction - h5_file_name (str): the name of the h5 file to be reconstructed - folder_path (str): the path to the folder containing the h5 file - - Returns: - str: confirmation message + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will run the reconstruction + :param h5_file_name: the name of the h5 file to be reconstructed + :param folder_path: the path to the folder containing the h5 file + :return: confirmation message """ import os import subprocess @@ -122,11 +121,8 @@ def build_multi_resolution( """ Tiff to Zarr code that is executed using Globus Compute - Args: - file_path (str): Path to the file to be processed. - - Returns: - bool: True if the task completed successfully, False otherwise. + :param file_path: Path to the file to be processed. + :return: True if the task completed successfully, False otherwise. """ logger = get_run_logger() @@ -163,13 +159,11 @@ def _build_multi_resolution_wrapper( """ Python function that wraps around the application call for Tiff to Zarr on ALCF - Args: - rundir (str): the directory on the eagle file system (ALCF) where the input data are located - script_path (str): the path to the script that will convert the tiff files to zarr - recon_path (str): the path to the reconstructed data - raw_path (str): the path to the raw data - Returns: - str: confirmation message + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will convert the tiff files to zarr + :param recon_path: the path to the reconstructed data + :param raw_path: the path to the raw data + :return: confirmation message """ import os import subprocess @@ -185,24 +179,122 @@ def _build_multi_resolution_wrapper( f"Converted tiff files to zarr;\n {zarr_res}" ) + def segmentation( + self, + recon_folder_path: str = "", + ) -> bool: + """ + Run tomography segmentation at ALCF through Globus Compute. + + :param recon_folder_path: Path to the reconstructed data folder to be processed. + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + + # Operate on reconstructed data + rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{recon_folder_path}" + output_dir = f"{self.allocation_root}/data/bl832/scratch/segmentation/{recon_folder_path}" + segmentation_module = "src.inference" + workdir = f"{self.allocation_root}/segmentation/scripts/forge_feb_seg_model_demo" + + gcc = Client(code_serialization_strategy=CombinedCode()) + + # TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID + # We will probably have 2 endpoints, one for recon, one for segmentation + endpoint_id = "168c595b-9493-42db-9c6a-aad960913de2" + # with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: + with Executor(endpoint_id=endpoint_id, client=gcc) as fxe: + logger.info(f"Running segmentation on {recon_folder_path} at ALCF") + future = fxe.submit( + self._segmentation_wrapper, + input_dir=rundir, + output_dir=output_dir, + script_module=segmentation_module, + workdir=workdir + ) + result = self._wait_for_globus_compute_future(future, "segmentation", check_interval=10) + return result + + @staticmethod + def _segmentation_wrapper( + input_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/reconstruction/", + output_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/", + script_module: str = "src.inference", + workdir: str = "/eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo", + nproc_per_node: int = 4, # 1 works + nnodes: int = 1, + nnode_rank: int = 0, + master_addr: str = "localhost", + master_port: str = "29500", + patch_size: int = 512, + batch_size: int = 1, + num_workers: int = 4, + confidence: float = 0.5, + prompts: list[str] = ["background", "cell"], + ) -> str: + """ + Python function that wraps around the application call for segmentation on ALCF + + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will run the segmentation + :param folder_path: the path to the folder containing the TIFF data to be segmented + :return: confirmation message + """ + import os + import subprocess + import time + + seg_start = time.time() + + # Move to directory where the segmentation code is located + os.chdir(workdir) + + # Run segmentation.py + command = [ + "python", "-m", "torch.distributed.run", + f"--nproc_per_node={nproc_per_node}", + f"--nnodes={nnodes}", + f"--node_rank={nnode_rank}", + f"--master_addr={master_addr}", + f"--master_port={master_port}", + "-m", script_module, + "--input-dir", input_dir, + "--output-dir", output_dir, + "--patch-size", str(patch_size), + "--batch-size", str(batch_size), + "--num-workers", str(num_workers), + "--confidence", str(confidence), + "--prompts", *prompts, + ] + + segment_res = subprocess.run(command) # stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + if segment_res.returncode != 0: + raise RuntimeError(f"Segmentation failed with return code {segment_res.returncode}") + + seg_end = time.time() + + print(f"Segmented data in {input_dir} in {seg_end-seg_start} seconds;\n {segment_res}") + return ( + f"Segmented data specified in {input_dir} in {seg_end-seg_start} seconds;\n" + f"{segment_res}" + ) + @staticmethod def _wait_for_globus_compute_future( future: Future, task_name: str, check_interval: int = 20, - walltime: int = 1200 # seconds = 20 minutes + walltime: int = 3600 # seconds = 60 minutes ) -> bool: """ Wait for a Globus Compute task to complete, assuming that if future.done() is False, the task is running. - Args: - future: The future object returned from the Globus Compute Executor submit method. - task_name: A descriptive name for the task being executed (used for logging). - check_interval: The interval (in seconds) between status checks. - walltime: The maximum time (in seconds) to wait for the task to complete. - - Returns: - bool: True if the task completed successfully within walltime, False otherwise. + :param future: The future object returned from the Globus Compute Executor submit method. + :param task_name: A descriptive name for the task being executed (used for logging). + :param check_interval: The interval (in seconds) between status checks. + :param walltime: The maximum time (in seconds) to wait for the task to complete. + :return: True if the task completed successfully within walltime, False otherwise. """ logger = get_run_logger() @@ -257,125 +349,17 @@ def _wait_for_globus_compute_future( return success -@task(name="schedule_prune_task") -def schedule_prune_task( - path: str, - location: str, - schedule_days: datetime.timedelta, - source_endpoint=None, - check_endpoint=None -) -> bool: - """ - Schedules a Prefect flow to prune files from a specified location. - - Args: - path (str): The file path to the folder containing the files. - location (str): The server location (e.g., 'alcf832_raw') where the files will be pruned. - schedule_days (int): The number of days after which the file should be deleted. - source_endpoint (str): The source endpoint for the files. - check_endpoint (str): The endpoint to check for the existence of the files. - - Returns: - bool: True if the task was scheduled successfully, False otherwise. - """ - logger = get_run_logger() - - try: - flow_name = f"delete {location}: {Path(path).name}" - schedule_prefect_flow( - deployment_name=f"prune_{location}/prune_{location}", - flow_run_name=flow_name, - parameters={ - "relative_path": path, - "source_endpoint": source_endpoint, - "check_endpoint": check_endpoint - }, - duration_from_now=schedule_days - ) - return True - except Exception as e: - logger.error(f"Failed to schedule prune task: {e}") - return False - - -@task(name="schedule_pruning") -def schedule_pruning( - alcf_raw_path: str = None, - alcf_scratch_path_tiff: str = None, - alcf_scratch_path_zarr: str = None, - nersc_scratch_path_tiff: str = None, - nersc_scratch_path_zarr: str = None, - data832_raw_path: str = None, - data832_scratch_path_tiff: str = None, - data832_scratch_path_zarr: str = None, - one_minute: bool = False, - config: Config832 = None -) -> bool: - """ - This function schedules the deletion of files from specified locations on ALCF, NERSC, and data832. - - Args: - alcf_raw_path (str, optional): The raw path of the h5 file on ALCF. - alcf_scratch_path_tiff (str, optional): The scratch path for TIFF files on ALCF. - alcf_scratch_path_zarr (str, optional): The scratch path for Zarr files on ALCF. - nersc_scratch_path_tiff (str, optional): The scratch path for TIFF files on NERSC. - nersc_scratch_path_zarr (str, optional): The scratch path for Zarr files on NERSC. - data832_scratch_path (str, optional): The scratch path on data832. - one_minute (bool, optional): Defaults to False. Whether to schedule the deletion after one minute. - config (Config832, optional): Configuration object for the flow. - - Returns: - bool: True if the tasks were scheduled successfully, False otherwise. - """ - logger = get_run_logger() - - pruning_config = Variable.get("pruning-config", _sync=True) - - if one_minute: - alcf_delay = datetime.timedelta(minutes=1) - nersc_delay = datetime.timedelta(minutes=1) - data832_delay = datetime.timedelta(minutes=1) - else: - alcf_delay = datetime.timedelta(days=pruning_config["delete_alcf832_files_after_days"]) - nersc_delay = datetime.timedelta(days=pruning_config["delete_nersc832_files_after_days"]) - data832_delay = datetime.timedelta(days=pruning_config["delete_data832_files_after_days"]) - - # (path, location, days, source_endpoint, check_endpoint) - delete_schedules = [ - (alcf_raw_path, "alcf832_raw", alcf_delay, config.alcf832_raw, config.data832_raw), - (alcf_scratch_path_tiff, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), - (alcf_scratch_path_zarr, "alcf832_scratch", alcf_delay, config.alcf832_scratch, config.data832_scratch), - (nersc_scratch_path_tiff, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), - (nersc_scratch_path_zarr, "nersc832_alsdev_scratch", nersc_delay, config.nersc832_alsdev_scratch, None), - (data832_raw_path, "data832_raw", data832_delay, config.data832_raw, None), - (data832_scratch_path_tiff, "data832_scratch", data832_delay, config.data832_scratch, None), - (data832_scratch_path_zarr, "data832_scratch", data832_delay, config.data832_scratch, None) - ] - - for path, location, days, source_endpoint, check_endpoint in delete_schedules: - if path: - schedule_prune_task(path, location, days, source_endpoint, check_endpoint) - logger.info(f"Scheduled delete from {location} at {days} days") - else: - logger.info(f"Path not provided for {location}, skipping scheduling of deletion task.") - - return True - - @flow(name="alcf_recon_flow", flow_run_name="alcf_recon-{file_path}") def alcf_recon_flow( file_path: str, config: Optional[Config832] = None, ) -> bool: """ - Process and transfer a file from a source to the ALCF. - - Args: - file_path (str): The path to the file to be processed. - config (Config832): Configuration object for the flow. + Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation. - Returns: - bool: True if the flow completed successfully, False otherwise. + :param file_path: The path to the file to be processed. + :param config: Configuration object for the flow. + :return: True if the flow completed successfully, False otherwise. """ logger = get_run_logger() @@ -390,18 +374,19 @@ def alcf_recon_flow( scratch_path_zarr = folder_name + '/rec' + file_name + '.zarr/' # initialize transfer_controller with globus + logger.info("Initializing Globus Transfer Controller.") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) # STEP 1: Transfer data from data832 to ALCF - logger.info("Copying data to ALCF.") + logger.info("Copying raw data to ALCF.") data832_raw_path = f"{folder_name}/{h5_file_name}" alcf_transfer_success = transfer_controller.copy( file_path=data832_raw_path, source=config.data832_raw, - destination=config.alcf832_raw + destination=config.alcf832_synaps_raw ) logger.info(f"Transfer status: {alcf_transfer_success}") @@ -411,14 +396,16 @@ def alcf_recon_flow( else: logger.info("Transfer to ALCF Successful.") - # STEP 2A: Run the Tomopy Reconstruction Globus Flow + # STEP 2: Run Tomopy Reconstruction on Globus Compute logger.info(f"Starting ALCF reconstruction flow for {file_path=}") # Initialize the Tomography Controller and run the reconstruction + logger.info("Initializing ALCF Tomography HPC Controller.") tomography_controller = get_controller( hpc_type=HPC.ALCF, config=config ) + logger.info(f"Starting ALCF reconstruction task for {file_path=}") alcf_reconstruction_success = tomography_controller.reconstruct( file_path=file_path, ) @@ -428,16 +415,17 @@ def alcf_recon_flow( else: logger.info("Reconstruction Successful.") - # Transfer A: Send reconstructed data (tiff) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + # STEP 3: Send reconstructed data (tiff) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " f"at ALCF to {config.data832_scratch} at data832") data832_tiff_transfer_success = transfer_controller.copy( file_path=scratch_path_tiff, - source=config.alcf832_scratch, + source=config.alcf832_synaps_recon, destination=config.data832_scratch ) + logger.info(f"Transfer reconstructed TIFF data to data832 success: {data832_tiff_transfer_success}") - # STEP 2B: Run the Tiff to Zarr Globus Flow + # STEP 4: Run the Tiff to Zarr Globus Flow logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") alcf_multi_res_success = tomography_controller.build_multi_resolution( file_path=file_path, @@ -447,32 +435,75 @@ def alcf_recon_flow( raise ValueError("Tiff to Zarr at ALCF Failed") else: logger.info("Tiff to Zarr Successful.") - # Transfer B: Send reconstructed data (zarr) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + # STEP 5: Send reconstructed data (zarr) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " f"at ALCF to {config.data832_scratch} at data832") data832_zarr_transfer_success = transfer_controller.copy( file_path=scratch_path_zarr, - source=config.alcf832_scratch, + source=config.alcf832_synaps_recon, destination=config.data832_scratch ) # Place holder in case we want to transfer to NERSC for long term storage - nersc_transfer_success = False - - data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success - schedule_pruning( - alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, - alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, - nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, - nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, - data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, - data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, - data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, - one_minute=False, # Set to False for production durations + # nersc_transfer_success = False + + # STEP 6: Schedule Pruning of files + logger.info("Scheduling file pruning tasks.") + prune_controller = get_prune_controller( + prune_type=PruneMethod.GLOBUS, config=config ) + # Prune from ALCF raw + if alcf_transfer_success: + logger.info("Scheduling pruning of ALCF raw data.") + prune_controller.prune( + file_path=data832_raw_path, + source_endpoint=config.alcf832_synaps_raw, + check_endpoint=None, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/reconstruction + if alcf_reconstruction_success: + logger.info("Scheduling pruning of ALCF scratch reconstruction data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune ZARR from ALCF scratch/reconstruction + if alcf_multi_res_success: + logger.info("Scheduling pruning of ALCF scratch zarr reconstruction data.") + prune_controller.prune( + file_path=scratch_path_zarr, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune reconstructed TIFFs from data832 scratch + if data832_tiff_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction TIFF data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # Prune reconstructed ZARR from data832 scratch + if data832_zarr_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction ZARR data.") + prune_controller.prune( + file_path=scratch_path_zarr, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + # TODO: ingest to scicat if alcf_reconstruction_success and alcf_multi_res_success: @@ -481,11 +512,219 @@ def alcf_recon_flow( return False -if __name__ == "__main__": - folder_name = 'dabramov' - file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast' - flow_success = alcf_recon_flow( - file_path=f"/{folder_name}/{file_name}.h5", +@flow(name="alcf_forge_recon_segment_flow", flow_run_name="alcf_recon_seg-{file_path}") +def alcf_forge_recon_segment_flow( + file_path: str, + config: Optional[Config832] = None, +) -> bool: + """ + Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation. + + :param file_path: The path to the file to be processed. + :param config: Configuration object for the flow. + :return: True if the flow completed successfully, False otherwise. + """ + logger = get_run_logger() + + if config is None: + config = Config832() + # set up file paths + path = Path(file_path) + folder_name = path.parent.name + file_name = path.stem + h5_file_name = file_name + '.h5' + scratch_path_tiff = folder_name + '/rec' + file_name + '/' + scratch_path_segment = folder_name + '/seg' + file_name + '/' + + # initialize transfer_controller with globus + logger.info("Initializing Globus Transfer Controller.") + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + # STEP 1: Transfer data from data832 to ALCF + logger.info("Copying raw data to ALCF.") + data832_raw_path = f"{folder_name}/{h5_file_name}" + alcf_transfer_success = transfer_controller.copy( + file_path=data832_raw_path, + source=config.data832_raw, + destination=config.alcf832_synaps_raw + ) + logger.info(f"Transfer status: {alcf_transfer_success}") + + if not alcf_transfer_success: + logger.error("Transfer failed due to configuration or authorization issues.") + raise ValueError("Transfer to ALCF Failed") + else: + logger.info("Transfer to ALCF Successful.") + + # STEP 2: Run the Tomopy Reconstruction Globus Flow + logger.info(f"Starting ALCF reconstruction flow for {file_path=}") + + # Initialize the Tomography Controller and run the reconstruction + logger.info("Initializing ALCF Tomography HPC Controller.") + tomography_controller = get_controller( + hpc_type=HPC.ALCF, + config=config + ) + logger.info(f"Starting ALCF reconstruction task for {file_path=}") + alcf_reconstruction_success = tomography_controller.reconstruct( + file_path=file_path, + ) + if not alcf_reconstruction_success: + logger.error("Reconstruction Failed.") + raise ValueError("Reconstruction at ALCF Failed") + else: + logger.info("Reconstruction Successful.") + + # STEP 3: Send reconstructed data (tiff) to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_recon} " + f"at ALCF to {config.data832_scratch} at data832") + data832_tiff_transfer_success = transfer_controller.copy( + file_path=scratch_path_tiff, + source=config.alcf832_synaps_recon, + destination=config.data832_scratch + ) + logger.info(f"Transfer reconstructed TIFF data to data832 success: {data832_tiff_transfer_success}") + + # STEP 4: Run the Segmentation Task at ALCF + logger.info(f"Starting ALCF segmentation task for {scratch_path_tiff=}") + alcf_segmentation_success = alcf_segmentation_task( + recon_folder_path=scratch_path_tiff, + config=config + ) + if not alcf_segmentation_success: + logger.warning("Segmentation at ALCF Failed") + else: + logger.info("Segmentation at ALCF Successful") + + # STEP 5: Send segmented data to data832 + logger.info(f"Transferring {file_name} from {config.alcf832_synaps_segment} " + f"at ALCF to {config.data832_scratch} at data832") + segment_transfer_success = transfer_controller.copy( + file_path=scratch_path_segment, + source=config.alcf832_synaps_segment, + destination=config.data832_scratch + ) + logger.info(f"Transfer segmented data to data832 success: {segment_transfer_success}") + + # STEP 6: Schedule Pruning of files + logger.info("Scheduling file pruning tasks.") + prune_controller = get_prune_controller( + prune_type=PruneMethod.GLOBUS, + config=config + ) + + # Prune from ALCF raw + if alcf_transfer_success: + logger.info("Scheduling pruning of ALCF raw data.") + prune_controller.prune( + file_path=data832_raw_path, + source_endpoint=config.alcf832_synaps_raw, + check_endpoint=None, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/reconstruction + if alcf_reconstruction_success: + logger.info("Scheduling pruning of ALCF scratch reconstruction data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.alcf832_synaps_recon, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune TIFFs from ALCF scratch/segmentation + if alcf_segmentation_success: + logger.info("Scheduling pruning of ALCF scratch segmentation data.") + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.alcf832_synaps_segment, + check_endpoint=config.data832_scratch, + days_from_now=2.0 + ) + + # Prune reconstructed TIFFs from data832 scratch + if data832_tiff_transfer_success: + logger.info("Scheduling pruning of data832 scratch reconstruction TIFF data.") + prune_controller.prune( + file_path=scratch_path_tiff, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # Prune segmented data from data832 scratch + if alcf_segmentation_success: + logger.info("Scheduling pruning of data832 scratch segmentation data.") + prune_controller.prune( + file_path=scratch_path_segment, + source_endpoint=config.data832_scratch, + check_endpoint=None, + days_from_now=30.0 + ) + + # TODO: ingest to scicat + + if alcf_reconstruction_success and alcf_segmentation_success: + return True + else: + return False + + +@task(name="alcf_segmentation_task") +def alcf_segmentation_task( + recon_folder_path: str, + config: Optional[Config832] = None, +) -> bool: + """ + Run segmentation task at ALCF. + + :param recon_folder_path: Path to the reconstructed data folder to be processed. + :param config: Configuration object for the flow. + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + if config is None: + logger.info("No config provided, using default Config832.") + config = Config832() + + # Initialize the Tomography Controller and run the segmentation + logger.info("Initializing ALCF Tomography HPC Controller.") + tomography_controller = get_controller( + hpc_type=HPC.ALCF, + config=config + ) + logger.info(f"Starting ALCF segmentation task for {recon_folder_path=}") + alcf_segmentation_success = tomography_controller.segmentation( + recon_folder_path=recon_folder_path, + ) + if not alcf_segmentation_success: + logger.error("Segmentation Failed.") + else: + logger.info("Segmentation Successful.") + return alcf_segmentation_success + + +@flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") +def alcf_segmentation_integration_test() -> bool: + """ + Integration test for the ALCF segmentation task. + + :return: True if the segmentation task completed successfully, False otherwise. + """ + logger = get_run_logger() + logger.info("Starting ALCF segmentation integration test.") + recon_folder_path = 'test' # 'rec20211222_125057_petiole4' + flow_success = alcf_segmentation_task( + recon_folder_path=recon_folder_path, config=Config832() ) - print(flow_success) + logger.info(f"Flow success: {flow_success}") + return flow_success + + +if __name__ == "__main__": + alcf_segmentation_integration_test() diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 788eef4a..d523952d 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -24,7 +24,10 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_pscratch_raw = self.endpoints["nersc832_alsdev_pscratch_raw"] self.nersc832_alsdev_pscratch_scratch = self.endpoints["nersc832_alsdev_pscratch_scratch"] self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] - self.alcf832_raw = self.endpoints["alcf832_raw"] - self.alcf832_scratch = self.endpoints["alcf832_scratch"] + self.alcf832_synaps_raw = self.endpoints["alcf832_synaps_raw"] + self.alcf832_synaps_recon = self.endpoints["alcf832_synaps_recon"] + self.alcf832_synaps_segment = self.endpoints["alcf832_synaps_segment"] + self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] + self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] self.scicat = self.config["scicat"] self.ghcr_images832 = self.config["ghcr_images832"] diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index cf1d0c64..7c799c2b 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -17,6 +17,9 @@ class FlowParameterMapper: "alcf_recon_flow/alcf_recon_flow": [ "file_path", "config"], + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": [ + "file_path", + "config"], # From move.py "new_832_file_flow/new_file_832": [ "file_path", @@ -55,22 +58,26 @@ class DecisionFlowInputModel(BaseModel): @task(name="setup_decision_settings") -def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: bool) -> dict: +def setup_decision_settings(alcf_recon: bool, alcf_forge_recon_segment: bool, nersc_recon: bool, new_file_832: bool) -> dict: """ This task is used to define the settings for the decision making process of the BL832 beamline. :param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow. + :param alcf_forge_recon_segment: Boolean indicating whether to run the ALCF forge reconstruction segmentation flow. :param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow. - :param nersc_move: Boolean indicating whether to move files to NERSC. + :param new_file_832: Boolean indicating whether to run the new file 832 flow. :return: A dictionary containing the settings for each flow. """ logger = get_run_logger() try: logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, " - f"nersc_recon={nersc_recon}, new_file_832={new_file_832}") + f"alcf_forge_recon_segment={alcf_forge_recon_segment}, " + f"nersc_recon={nersc_recon}, " + f"new_file_832={new_file_832}") # Define which flows to run based on the input settings settings = { "alcf_recon_flow/alcf_recon_flow": alcf_recon, + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment, "nersc_recon_flow/nersc_recon_flow": nersc_recon, "new_832_file_flow/new_file_832": new_file_832 } @@ -145,6 +152,13 @@ async def dispatcher( alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) + if decision_settings.get("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow"): + alcf_forge_params = FlowParameterMapper.get_flow_parameters( + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", + available_params + ) + tasks.append(run_recon_flow_async("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", alcf_forge_params)) + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) @@ -169,7 +183,7 @@ async def dispatcher( """ try: # Setup decision settings based on input parameters - setup_decision_settings(alcf_recon=True, nersc_recon=True, new_file_832=True) + setup_decision_settings(alcf_recon=True, alcf_forge_recon_segment=False, nersc_recon=True, new_file_832=True) # Run the main decision flow with the specified parameters # asyncio.run(dispatcher( # config={}, # PYTEST, ALCF, NERSC diff --git a/orchestration/flows/bl832/prefect.yaml b/orchestration/flows/bl832/prefect.yaml index a1d4613b..20858610 100644 --- a/orchestration/flows/bl832/prefect.yaml +++ b/orchestration/flows/bl832/prefect.yaml @@ -55,6 +55,12 @@ deployments: name: alcf_recon_flow_pool work_queue_name: alcf_recon_flow_queue +- alcf_forge_recon_segment_flow: + entrypoint: orchestration/flows/bl832/alcf.py:alcf_forge_recon_segment_flow + work_pool: + name: alcf_recon_flow_pool + work_queue_name: alcf_forge_recon_segment_flow_queue + # Pruning flows - name: prune_globus_endpoint entrypoint: orchestration/prune_controller.py:prune_globus_endpoint diff --git a/scripts/polaris/globus_compute_recon_config.yaml b/scripts/polaris/globus_compute_recon_config.yaml new file mode 100644 index 00000000..66ffd331 --- /dev/null +++ b/scripts/polaris/globus_compute_recon_config.yaml @@ -0,0 +1,39 @@ +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 2 + max_workers: 1 # Sets one worker per node + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: debug + cpus_per_node: 64 + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle" + + # Node setup: activate necessary conda environment and such + worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" + + walltime: 00:60:00 # Jobs will end after 60 minutes + nodes_per_block: 2 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time diff --git a/scripts/polaris/globus_compute_segment_config.yaml b/scripts/polaris/globus_compute_segment_config.yaml new file mode 100644 index 00000000..6479e84f --- /dev/null +++ b/scripts/polaris/globus_compute_segment_config.yaml @@ -0,0 +1,48 @@ +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 2 + # max_workers: 1 # Sets one worker per node + max_workers_per_node: 4 + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: debug # debug (1-2 nodes), debug-scaling (1-10 nodes), or some other queue, probably want demand (1-56 nodes) for real-time things, prod (496 nodes) + # minimum node 1, max 56 nodes. Max time 59 minutes + cpus_per_node: 32 # may want to change to 4 (only 4 GPUs per node) + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle -l select=1:ngpus=4" + # Node setup: activate necessary conda environment and such + # worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/segmentation/env/; export PATH=$PATH:/eagle/SYNAPS-I/; cd $HOME/.globus_compute/globus_compute_segmentation" + worker_init: | + module use /soft/modulefiles + module load conda + conda activate base + source /eagle/SYNAPS-I/segmentation/env/bin/activate + export HF_HUB_CACHE=/eagle/SYNAPS-I/segmentation/.cache/huggingface + export HF_HOME=$HF_HUB_CACHE + cd /eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo + + walltime: 59:00 # Jobs will end after 59 minutes + nodes_per_block: 2 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time