Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/bl832_ALCF.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ def prune_alcf832_raw(relative_path: str):
prune_one_safe(
file=relative_path,
if_older_than_days=0,
tranfer_client=tc,
transfer_client=tc,
source_endpoint=config.alcf832_raw,
check_endpoint=config.nersc832_alsdev_raw,
logger=p_logger,
Expand Down
2 changes: 1 addition & 1 deletion docs/mkdocs/docs/alcf832.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def prune_alcf832_raw(relative_path: str):
prune_one_safe(
file=relative_path,
if_older_than_days=0,
tranfer_client=tc,
transfer_client=tc,
source_endpoint=config.alcf832_raw,
check_endpoint=config.nersc832_alsdev_raw,
logger=p_logger,
Expand Down
2 changes: 1 addition & 1 deletion orchestration/flows/bl832/prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def prune_files(
prune_one_safe(
file=relative_path,
if_older_than_days=0,
tranfer_client=config.tc,
transfer_client=config.tc,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
logger=p_logger,
Expand Down
20 changes: 11 additions & 9 deletions orchestration/globus/transfer.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from dateutil import parser
from dotenv import load_dotenv
import json
import logging
import os
from pathlib import Path
from time import time
from typing import Dict, List, Union
from dotenv import load_dotenv
from typing import Dict, List, Optional, Union

from globus_sdk import (
ClientCredentialsAuthorizer,
ConfidentialAppAuthClient,
DeleteData,
TransferClient,
TransferData
)

from ..config import get_config
from prefect import task, get_run_logger
# from prefect.blocks.system import Secret
from ..config import get_config

load_dotenv()

Expand Down Expand Up @@ -83,7 +85,7 @@ def build_apps(config: Dict) -> Dict[str, GlobusEndpoint]:


@task
def init_transfer_client(app: GlobusApp) -> TransferClient:
def init_transfer_client(app: Optional[GlobusApp] = None) -> TransferClient:
logger = get_run_logger()
# Get the client id and secret from Prefect Secret Blocks
GLOBUS_CLIENT_ID = os.getenv("GLOBUS_CLIENT_ID")
Expand Down Expand Up @@ -269,7 +271,7 @@ def task_wait(
def prune_one_safe(
file: str,
if_older_than_days: int,
tranfer_client: TransferClient,
transfer_client: TransferClient,
source_endpoint: GlobusEndpoint,
check_endpoint: Union[GlobusEndpoint, None],
max_wait_seconds: int = 120,
Expand All @@ -281,15 +283,15 @@ def prune_one_safe(
is also located at the check_endpoint. If not, raises
"""
# does the file exist at the source endpoint?
g_file_obj = get_globus_file_object(tranfer_client, source_endpoint, file)
g_file_obj = get_globus_file_object(transfer_client, source_endpoint, file)
assert g_file_obj is not None, f"file not found {source_endpoint.uri}"
logger.info(f"file: {file} found on {source_endpoint.uri}")

# does the file exist at the check endpoint?
if check_endpoint is None:
logger.info("No check endpoint provided, skipping check")
else:
g_file_obj = get_globus_file_object(tranfer_client, check_endpoint, file)
g_file_obj = get_globus_file_object(transfer_client, check_endpoint, file)
assert g_file_obj is not None, f"file not found {check_endpoint.uri}"
logger.info(f"file: {file} found on {check_endpoint.uri}")

Expand All @@ -306,14 +308,14 @@ def prune_one_safe(
logger.info("Not checking dates, sent if_older_than_days==0")

delete_id = prune_files(
tranfer_client,
transfer_client,
source_endpoint,
[file],
max_wait_seconds=max_wait_seconds,
logger=logger,
)

task_wait(tranfer_client, delete_id)
task_wait(transfer_client, delete_id)
logger.info(f"file deleted from: {source_endpoint.uri}")


Expand Down
4 changes: 2 additions & 2 deletions orchestration/prune_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,12 @@ def prune_globus_endpoint(
logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'")

if not config:
tc = init_transfer_client()
tc = init_transfer_client(app=None)
else:
tc = config.tc
globus_settings = Variable.get("globus-settings", _sync=True)
max_wait_seconds = globus_settings["max_wait_seconds"]
flow_name = f"prune_from_{source_endpoint.name}"
flow_name = f"prune_{relative_path}_from_{source_endpoint.name}"
logger.info(f"Running flow: {flow_name}")
logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}")
prune_one_safe(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies = [
"pydantic==2.11",
"python-dotenv",
"pyyaml",
"scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@main",
"scicat_beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79",
"sfapi_client"
]

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ prometheus_client==0.21.1
pydantic==2.11
python-dotenv
pyyaml
scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@main
scicat-beamline @ git+https://github.com/als-computing/scicat_beamline.git@4828273f5f49ba4eba5442728729e0545b3f5b79
sfapi_client