From c9830b04c61960f666ba78be5148c0261507d797 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 2 Feb 2026 10:19:58 -0800 Subject: [PATCH 1/5] Making GlobusApp optional in init_transfer_client() --- orchestration/globus/transfer.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/orchestration/globus/transfer.py b/orchestration/globus/transfer.py index 764ba6ad..4f5110f0 100644 --- a/orchestration/globus/transfer.py +++ b/orchestration/globus/transfer.py @@ -1,13 +1,14 @@ from dataclasses import dataclass from datetime import datetime, timezone, timedelta from dateutil import parser +from dotenv import load_dotenv import json import logging import os from pathlib import Path from time import time -from typing import Dict, List, Union -from dotenv import load_dotenv +from typing import Dict, List, Optional, Union + from globus_sdk import ( ClientCredentialsAuthorizer, ConfidentialAppAuthClient, @@ -15,9 +16,10 @@ TransferClient, TransferData ) + +from ..config import get_config from prefect import task, get_run_logger # from prefect.blocks.system import Secret -from ..config import get_config load_dotenv() @@ -83,7 +85,7 @@ def build_apps(config: Dict) -> Dict[str, GlobusEndpoint]: @task -def init_transfer_client(app: GlobusApp) -> TransferClient: +def init_transfer_client(app: Optional[GlobusApp]) -> TransferClient: logger = get_run_logger() # Get the client id and secret from Prefect Secret Blocks GLOBUS_CLIENT_ID = os.getenv("GLOBUS_CLIENT_ID") From 94b5316367fb2d6fb2d626ad548728f268611813 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 2 Feb 2026 10:24:49 -0800 Subject: [PATCH 2/5] Pinning scicatt_beamline version --- pyproject.toml | 2 +- requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c121411f..7fee39f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ dependencies = [ "pydantic==2.11", "python-dotenv", "pyyaml", - "scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@main", + "scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79", "sfapi_client" ] diff --git a/requirements.txt b/requirements.txt index 5f72de3d..527e9c0d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,5 +15,5 @@ prometheus_client==0.21.1 pydantic==2.11 python-dotenv pyyaml -scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@main +scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79 sfapi_client From b22de392d553723cb4dfeaa3637f4f0a883b4a27 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 2 Feb 2026 10:40:03 -0800 Subject: [PATCH 3/5] making app=None by default for init_transfer_controller() --- orchestration/globus/transfer.py | 2 +- orchestration/prune_controller.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/globus/transfer.py b/orchestration/globus/transfer.py index 4f5110f0..fb84656a 100644 --- a/orchestration/globus/transfer.py +++ b/orchestration/globus/transfer.py @@ -85,7 +85,7 @@ def build_apps(config: Dict) -> Dict[str, GlobusEndpoint]: @task -def init_transfer_client(app: Optional[GlobusApp]) -> TransferClient: +def init_transfer_client(app: Optional[GlobusApp] = None) -> TransferClient: logger = get_run_logger() # Get the client id and secret from Prefect Secret Blocks GLOBUS_CLIENT_ID = os.getenv("GLOBUS_CLIENT_ID") diff --git a/orchestration/prune_controller.py b/orchestration/prune_controller.py index c65b487c..f9a0ed9a 100644 --- a/orchestration/prune_controller.py +++ b/orchestration/prune_controller.py @@ -327,7 +327,7 @@ def prune_globus_endpoint( logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'") if not config: - tc = init_transfer_client() + tc = init_transfer_client(app=None) else: tc = config.tc globus_settings = Variable.get("globus-settings", _sync=True) From 0c7adebc9ca9a293b2aee1adf3b47eb034c420b2 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 2 Feb 2026 10:44:44 -0800 Subject: [PATCH 4/5] Fixing tranfer_client typo -> transfer_client --- docs/bl832_ALCF.md | 2 +- docs/mkdocs/docs/alcf832.md | 2 +- orchestration/flows/bl832/prune.py | 2 +- orchestration/globus/transfer.py | 10 +++++----- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/bl832_ALCF.md b/docs/bl832_ALCF.md index b6f9f8c0..6eda4975 100644 --- a/docs/bl832_ALCF.md +++ b/docs/bl832_ALCF.md @@ -428,7 +428,7 @@ def prune_alcf832_raw(relative_path: str): prune_one_safe( file=relative_path, if_older_than_days=0, - tranfer_client=tc, + transfer_client=tc, source_endpoint=config.alcf832_raw, check_endpoint=config.nersc832_alsdev_raw, logger=p_logger, diff --git a/docs/mkdocs/docs/alcf832.md b/docs/mkdocs/docs/alcf832.md index 3774832f..3d441e0d 100644 --- a/docs/mkdocs/docs/alcf832.md +++ b/docs/mkdocs/docs/alcf832.md @@ -389,7 +389,7 @@ def prune_alcf832_raw(relative_path: str): prune_one_safe( file=relative_path, if_older_than_days=0, - tranfer_client=tc, + transfer_client=tc, source_endpoint=config.alcf832_raw, check_endpoint=config.nersc832_alsdev_raw, logger=p_logger, diff --git a/orchestration/flows/bl832/prune.py b/orchestration/flows/bl832/prune.py index 1de05085..91ddb54f 100644 --- a/orchestration/flows/bl832/prune.py +++ b/orchestration/flows/bl832/prune.py @@ -36,7 +36,7 @@ def prune_files( prune_one_safe( file=relative_path, if_older_than_days=0, - tranfer_client=config.tc, + transfer_client=config.tc, source_endpoint=source_endpoint, check_endpoint=check_endpoint, logger=p_logger, diff --git a/orchestration/globus/transfer.py b/orchestration/globus/transfer.py index fb84656a..7a9c010a 100644 --- a/orchestration/globus/transfer.py +++ b/orchestration/globus/transfer.py @@ -271,7 +271,7 @@ def task_wait( def prune_one_safe( file: str, if_older_than_days: int, - tranfer_client: TransferClient, + transfer_client: TransferClient, source_endpoint: GlobusEndpoint, check_endpoint: Union[GlobusEndpoint, None], max_wait_seconds: int = 120, @@ -283,7 +283,7 @@ def prune_one_safe( is also located at the check_endpoint. If not, raises """ # does the file exist at the source endpoint? - g_file_obj = get_globus_file_object(tranfer_client, source_endpoint, file) + g_file_obj = get_globus_file_object(transfer_client, source_endpoint, file) assert g_file_obj is not None, f"file not found {source_endpoint.uri}" logger.info(f"file: {file} found on {source_endpoint.uri}") @@ -291,7 +291,7 @@ def prune_one_safe( if check_endpoint is None: logger.info("No check endpoint provided, skipping check") else: - g_file_obj = get_globus_file_object(tranfer_client, check_endpoint, file) + g_file_obj = get_globus_file_object(transfer_client, check_endpoint, file) assert g_file_obj is not None, f"file not found {check_endpoint.uri}" logger.info(f"file: {file} found on {check_endpoint.uri}") @@ -308,14 +308,14 @@ def prune_one_safe( logger.info("Not checking dates, sent if_older_than_days==0") delete_id = prune_files( - tranfer_client, + transfer_client, source_endpoint, [file], max_wait_seconds=max_wait_seconds, logger=logger, ) - task_wait(tranfer_client, delete_id) + task_wait(transfer_client, delete_id) logger.info(f"file deleted from: {source_endpoint.uri}") From 342b2089e66b0725ab9d47225f6c111c8cb9bf70 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 2 Feb 2026 10:48:40 -0800 Subject: [PATCH 5/5] adding relative path to prune flow run name --- orchestration/prune_controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/prune_controller.py b/orchestration/prune_controller.py index f9a0ed9a..4d97826d 100644 --- a/orchestration/prune_controller.py +++ b/orchestration/prune_controller.py @@ -332,7 +332,7 @@ def prune_globus_endpoint( tc = config.tc globus_settings = Variable.get("globus-settings", _sync=True) max_wait_seconds = globus_settings["max_wait_seconds"] - flow_name = f"prune_from_{source_endpoint.name}" + flow_name = f"prune_{relative_path}_from_{source_endpoint.name}" logger.info(f"Running flow: {flow_name}") logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") prune_one_safe(