diff --git a/docs/bl832_ALCF.md b/docs/bl832_ALCF.md index b6f9f8c0..6eda4975 100644 --- a/docs/bl832_ALCF.md +++ b/docs/bl832_ALCF.md @@ -428,7 +428,7 @@ def prune_alcf832_raw(relative_path: str): prune_one_safe( file=relative_path, if_older_than_days=0, - tranfer_client=tc, + transfer_client=tc, source_endpoint=config.alcf832_raw, check_endpoint=config.nersc832_alsdev_raw, logger=p_logger, diff --git a/docs/mkdocs/docs/alcf832.md b/docs/mkdocs/docs/alcf832.md index 3774832f..3d441e0d 100644 --- a/docs/mkdocs/docs/alcf832.md +++ b/docs/mkdocs/docs/alcf832.md @@ -389,7 +389,7 @@ def prune_alcf832_raw(relative_path: str): prune_one_safe( file=relative_path, if_older_than_days=0, - tranfer_client=tc, + transfer_client=tc, source_endpoint=config.alcf832_raw, check_endpoint=config.nersc832_alsdev_raw, logger=p_logger, diff --git a/orchestration/flows/bl832/prune.py b/orchestration/flows/bl832/prune.py index 1de05085..91ddb54f 100644 --- a/orchestration/flows/bl832/prune.py +++ b/orchestration/flows/bl832/prune.py @@ -36,7 +36,7 @@ def prune_files( prune_one_safe( file=relative_path, if_older_than_days=0, - tranfer_client=config.tc, + transfer_client=config.tc, source_endpoint=source_endpoint, check_endpoint=check_endpoint, logger=p_logger, diff --git a/orchestration/globus/transfer.py b/orchestration/globus/transfer.py index 764ba6ad..7a9c010a 100644 --- a/orchestration/globus/transfer.py +++ b/orchestration/globus/transfer.py @@ -1,13 +1,14 @@ from dataclasses import dataclass from datetime import datetime, timezone, timedelta from dateutil import parser +from dotenv import load_dotenv import json import logging import os from pathlib import Path from time import time -from typing import Dict, List, Union -from dotenv import load_dotenv +from typing import Dict, List, Optional, Union + from globus_sdk import ( ClientCredentialsAuthorizer, ConfidentialAppAuthClient, @@ -15,9 +16,10 @@ TransferClient, TransferData ) + +from ..config import get_config from prefect import task, get_run_logger # from prefect.blocks.system import Secret -from ..config import get_config load_dotenv() @@ -83,7 +85,7 @@ def build_apps(config: Dict) -> Dict[str, GlobusEndpoint]: @task -def init_transfer_client(app: GlobusApp) -> TransferClient: +def init_transfer_client(app: Optional[GlobusApp] = None) -> TransferClient: logger = get_run_logger() # Get the client id and secret from Prefect Secret Blocks GLOBUS_CLIENT_ID = os.getenv("GLOBUS_CLIENT_ID") @@ -269,7 +271,7 @@ def task_wait( def prune_one_safe( file: str, if_older_than_days: int, - tranfer_client: TransferClient, + transfer_client: TransferClient, source_endpoint: GlobusEndpoint, check_endpoint: Union[GlobusEndpoint, None], max_wait_seconds: int = 120, @@ -281,7 +283,7 @@ def prune_one_safe( is also located at the check_endpoint. If not, raises """ # does the file exist at the source endpoint? - g_file_obj = get_globus_file_object(tranfer_client, source_endpoint, file) + g_file_obj = get_globus_file_object(transfer_client, source_endpoint, file) assert g_file_obj is not None, f"file not found {source_endpoint.uri}" logger.info(f"file: {file} found on {source_endpoint.uri}") @@ -289,7 +291,7 @@ def prune_one_safe( if check_endpoint is None: logger.info("No check endpoint provided, skipping check") else: - g_file_obj = get_globus_file_object(tranfer_client, check_endpoint, file) + g_file_obj = get_globus_file_object(transfer_client, check_endpoint, file) assert g_file_obj is not None, f"file not found {check_endpoint.uri}" logger.info(f"file: {file} found on {check_endpoint.uri}") @@ -306,14 +308,14 @@ def prune_one_safe( logger.info("Not checking dates, sent if_older_than_days==0") delete_id = prune_files( - tranfer_client, + transfer_client, source_endpoint, [file], max_wait_seconds=max_wait_seconds, logger=logger, ) - task_wait(tranfer_client, delete_id) + task_wait(transfer_client, delete_id) logger.info(f"file deleted from: {source_endpoint.uri}") diff --git a/orchestration/prune_controller.py b/orchestration/prune_controller.py index c65b487c..4d97826d 100644 --- a/orchestration/prune_controller.py +++ b/orchestration/prune_controller.py @@ -327,12 +327,12 @@ def prune_globus_endpoint( logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'") if not config: - tc = init_transfer_client() + tc = init_transfer_client(app=None) else: tc = config.tc globus_settings = Variable.get("globus-settings", _sync=True) max_wait_seconds = globus_settings["max_wait_seconds"] - flow_name = f"prune_from_{source_endpoint.name}" + flow_name = f"prune_{relative_path}_from_{source_endpoint.name}" logger.info(f"Running flow: {flow_name}") logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") prune_one_safe( diff --git a/pyproject.toml b/pyproject.toml index c121411f..7fee39f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ dependencies = [ "pydantic==2.11", "python-dotenv", "pyyaml", - "scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@main", + "scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79", "sfapi_client" ] diff --git a/requirements.txt b/requirements.txt index 5f72de3d..527e9c0d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,5 +15,5 @@ prometheus_client==0.21.1 pydantic==2.11 python-dotenv pyyaml -scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@main +scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79 sfapi_client