From a1fffe9a91633d04a77c9a98d7275d9c8638c469 Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Sun, 16 Feb 2025 16:06:26 -0500 Subject: [PATCH 1/5] Added fathom_net_crop_crop_fix tool --- src/DD_tools/fathom_net_crop_fix/README.md | 8 + src/DD_tools/fathom_net_crop_fix/__init__.py | 0 src/DD_tools/fathom_net_crop_fix/classes.py | 181 +++++++++++++++++++ 3 files changed, 189 insertions(+) create mode 100644 src/DD_tools/fathom_net_crop_fix/README.md create mode 100644 src/DD_tools/fathom_net_crop_fix/__init__.py create mode 100644 src/DD_tools/fathom_net_crop_fix/classes.py diff --git a/src/DD_tools/fathom_net_crop_fix/README.md b/src/DD_tools/fathom_net_crop_fix/README.md new file mode 100644 index 0000000..7da7009 --- /dev/null +++ b/src/DD_tools/fathom_net_crop_fix/README.md @@ -0,0 +1,8 @@ +tool to fix fathom net cropped images (crop again from original with updated algorithm) + +Additional config fields: + +* `uuid_table_path` - path to uuid table to filter out +* `look_up_table_path` - path to look up table with `uuid - file_name` information +* `filtered_by_size` - original csv that was used for cropping (contains `uuid` matches and crop coordinates) +* `data_transfer_table` - csv that contains match between ToL dataset file and original image \ No newline at end of file diff --git a/src/DD_tools/fathom_net_crop_fix/__init__.py b/src/DD_tools/fathom_net_crop_fix/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/DD_tools/fathom_net_crop_fix/classes.py b/src/DD_tools/fathom_net_crop_fix/classes.py new file mode 100644 index 0000000..8c9d9bd --- /dev/null +++ b/src/DD_tools/fathom_net_crop_fix/classes.py @@ -0,0 +1,181 @@ +import hashlib +import os +import uuid +from typing import List + +import numpy as np +import pandas as pd +import pyspark.sql.functions as func + +from DD_tools.main.config import Config +from DD_tools.main.filters import FilterRegister, SparkFilterToolBase +from DD_tools.main.runners import MPIRunnerTool, RunnerRegister +from DD_tools.main.schedulers import DefaultScheduler, SchedulerRegister +from DD_tools.main.utils import load_dataframe + +server_pattern = r"server=([^/]+)" + + +@FilterRegister("fathom_net_crop_fix") +class FathomnetCropFixFilter(SparkFilterToolBase): + def __init__(self, cfg: Config): + super().__init__(cfg) + + self.filter_name: str = "fathom_net_crop_fix" + + def run(self): + uuid_table_df = load_dataframe( + self.spark, self.config["uuid_table_path"] + ).repartition(20) + lookup_table_df = load_dataframe( + self.spark, self.config["look_up_table_path"] + ).repartition(20) + + merged_df = uuid_table_df.join( + lookup_table_df, on="uuid", how="left" + ).withColumn("server", func.regexp_extract("path", server_pattern, 1)) + + ( + merged_df.repartition(1).write.csv( + os.path.join(self.tools_path, self.filter_name, "filter_table"), + header=True, + mode="overwrite", + ) + ) + + +@SchedulerRegister("fathom_net_crop_fix") +class FathomnetCropFixScheduleCreation(DefaultScheduler): + def __init__(self, cfg: Config): + super().__init__(cfg) + + self.filter_name: str = "fathom_net_crop_fix" + self.scheme = ["server"] + + +@RunnerRegister("fathom_net_crop_fix") +class FathomnetCropFixRunner(MPIRunnerTool): + def __init__(self, cfg: Config): + super().__init__(cfg) + self.filter_name: str = "fathom_net_crop_fix" + self.data_scheme: List[str] = ["uuid", "server", "path"] + self.verification_scheme: List[str] = ["server"] + self.total_time = 150 + + self.data_transfer_df = pd.read_csv(cfg["data_transfer_table"]) + self.bb_df = pd.read_csv(cfg["filtered_by_size"]) + self.image_crop_path = os.path.join( + cfg.get_folder("path_to_output_folder"), "image_crop" + ) + self.base_path = ( + "/fs/scratch/PAS2136/gbif/processed/fathomNet/images_full/source=fathomNet" + ) + self.original_image_base_path = ( + "/fs/scratch/PAS2136/gbif/processed/fathomNet/images_full/downloaded_images" + ) + + def apply_filter(self, filtering_df: pd.DataFrame, server_name: str) -> int: + self.is_enough_time() + uuids_df = self.bb_df.merge( + filtering_df[["uuid"]], + left_on="tol_uuid", + right_on="uuid", + how="inner", + validate="1:1", + ) + cropped_images = [] + + for full_path, images_df in filtering_df.groupby("path"): + assert isinstance(full_path, str), "Not a string" + + file_name = os.path.basename(full_path) + original_image_path = ( + self.original_image_base_path + + self.data_transfer_df[ + self.data_transfer_df["dst_path"] + == os.path.join(self.base_path, f"server={server_name}", file_name) + ].iloc[0]["src_path"][67:] + ) + + if not os.path.exists(original_image_path): + self.logger.info(f"Path doesn't exists: {original_image_path}") + return 0 + + full_image = pd.read_parquet( + original_image_path, + filters=[("source_id", "in", uuids_df["image_uuid"])], + ) + + self.is_enough_time() + + columns = full_image.columns + full_image = full_image.merge( + self.bb_df, + left_on="source_id", + right_on="image_uuid", + how="inner", + validate="1:m", + ) + + for _, row in full_image.iterrows(): + cropped_entry = row[columns].to_dict() + image_binary = row["image"] + image_size = row["resized_size"] + image_np = np.frombuffer(image_binary, dtype=np.uint8).reshape( + [image_size[0], image_size[1], 3] + ) + + # fix + min_y = min(image_size[0], max(row["y"], 0)) + min_x = min(image_size[1], max(row["x"], 0)) + max_y = min(image_size[0], max(row["y"] + row["height"], 0)) + max_x = min(image_size[1], max(row["x"] + row["width"], 0)) + + image_cropped = image_np[min_y:max_y, min_x:max_x] + + cropped_entry["image"] = image_cropped.tobytes() + cropped_entry["resized_size"] = (max_y - min_y, max_x - min_x) + cropped_entry["hashsum_resized"] = hashlib.md5( + cropped_entry["image"] + ).hexdigest() + cropped_entry["uuid"] = row["tol_uuid"] + cropped_entry["source_id"] = row["bb_uuid"] + + assert len(cropped_entry["image"]) == ( + cropped_entry["resized_size"][0] + * cropped_entry["resized_size"][1] + * 3 + ), f"Size mismatch for {row['tol_uuid']}" + + cropped_images.append(cropped_entry) + + self.is_enough_time() + cropped_image = pd.DataFrame(cropped_images) + output_path = os.path.join(self.image_crop_path, f"server={server_name}") + os.makedirs(output_path, exist_ok=True) + cropped_image.to_parquet( + os.path.join(output_path, f"data_{uuid.uuid4()}.parquet"), + index=False, + compression="zstd", + compression_level=3, + ) + + return len(cropped_image) + + def runner_fn(self, df_local: pd.DataFrame) -> int: + filtering_df = df_local.reset_index(drop=True) + server_name = filtering_df.iloc[0]["server"] + try: + filtered_parquet_length = self.apply_filter(filtering_df, server_name) + except NotImplementedError: + raise NotImplementedError("Filter function wasn't implemented") + except Exception as e: + self.logger.exception(e) + self.logger.error(f"Error occurred: {e}") + return 0 + else: + print(f"{server_name}", end="\n", file=self.verification_IO) + self.logger.debug( + f"Completed filtering: {server_name} with {filtered_parquet_length}" + ) + return 1 From f41119261d8b8b6a8a7da29ce0cfbe37b604b389 Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Mon, 12 May 2025 02:29:05 -0400 Subject: [PATCH 2/5] Rename project from 'DD_tools' to 'TreeOfLife_toolbox'. Updated package structure, filenames, and references to reflect the new name. Adjusted `pyproject.toml` to rename the project, update dependencies, and modify supported Python versions. These changes ensure consistency and alignment with the new project branding. --- pyproject.toml | 28 ++++--------------- .../__init__.py | 0 .../main/__about__.py | 0 .../main/checkpoint.py | 0 .../main/config.py | 0 .../main/config_templates/tools.yaml | 0 .../main/filter.py | 0 .../main/filters.py | 0 .../main/main.py | 0 .../main/registry.py | 0 .../main/runner.py | 0 .../main/runners.py | 0 .../main/scheduler.py | 0 .../main/schedulers.py | 0 .../main/utils.py | 0 .../main/verification.py | 0 16 files changed, 5 insertions(+), 23 deletions(-) rename src/{DD_tools => TreeOfLife_toolbox}/__init__.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/__about__.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/checkpoint.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/config.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/config_templates/tools.yaml (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/filter.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/filters.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/main.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/registry.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/runner.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/runners.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/scheduler.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/schedulers.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/utils.py (100%) rename src/{DD_tools => TreeOfLife_toolbox}/main/verification.py (100%) diff --git a/pyproject.toml b/pyproject.toml index 17be78f..cb76174 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,10 +3,10 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -packages = ["src/DD_tools"] +packages = ["src/TreeOfLife_toolbox"] [project] -name = "DD_tools" +name = "TreeOfLife_toolbox" dynamic = ["version"] authors = [ { name = "Andrey Kopanev", email = "kopanev.1@osu.edu" }, @@ -15,7 +15,7 @@ authors = [ ] description = "A tool for downloading files from a list of URLs in parallel." readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.10, <3.12" classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", @@ -24,48 +24,30 @@ classifiers = [ dependencies = [ "attrs", "brotli", - "certifi", - "charset-normalizer", "cramjam", "cython", - "exceptiongroup", "fsspec", - "hatchling", - "idna", "inflate64", - "iniconfig", - "mpi4py < 4", + "mpi4py", "multivolumefile", - "numpy", "opencv-python", - "packaging", "pandas", "pathspec", "pillow", - "pip", - "pluggy", "psutil", - "py4j", "pyarrow", "pybcj", "pycryptodomex", "pyppmd", "pyspark", - "pytest", - "python-dateutil", "python-dotenv", - "pytz", "pyyaml", "pyzstd", "requests", "setuptools", - "six", "texttable", - "tomli", "trove-classifiers", "typing-extensions", - "tzdata", - "urllib3", "wheel" ] @@ -85,4 +67,4 @@ Repository = "https://github.com/Imageomics/distributed-downloader.git" "Bug Tracker" = "https://github.com/Imageomics/distributed-downloader/issues" [tool.hatch.version] -path = "src/DD_tools/main/__about__.py" +path = "src/TreeOfLife_toolbox/main/__about__.py" diff --git a/src/DD_tools/__init__.py b/src/TreeOfLife_toolbox/__init__.py similarity index 100% rename from src/DD_tools/__init__.py rename to src/TreeOfLife_toolbox/__init__.py diff --git a/src/DD_tools/main/__about__.py b/src/TreeOfLife_toolbox/main/__about__.py similarity index 100% rename from src/DD_tools/main/__about__.py rename to src/TreeOfLife_toolbox/main/__about__.py diff --git a/src/DD_tools/main/checkpoint.py b/src/TreeOfLife_toolbox/main/checkpoint.py similarity index 100% rename from src/DD_tools/main/checkpoint.py rename to src/TreeOfLife_toolbox/main/checkpoint.py diff --git a/src/DD_tools/main/config.py b/src/TreeOfLife_toolbox/main/config.py similarity index 100% rename from src/DD_tools/main/config.py rename to src/TreeOfLife_toolbox/main/config.py diff --git a/src/DD_tools/main/config_templates/tools.yaml b/src/TreeOfLife_toolbox/main/config_templates/tools.yaml similarity index 100% rename from src/DD_tools/main/config_templates/tools.yaml rename to src/TreeOfLife_toolbox/main/config_templates/tools.yaml diff --git a/src/DD_tools/main/filter.py b/src/TreeOfLife_toolbox/main/filter.py similarity index 100% rename from src/DD_tools/main/filter.py rename to src/TreeOfLife_toolbox/main/filter.py diff --git a/src/DD_tools/main/filters.py b/src/TreeOfLife_toolbox/main/filters.py similarity index 100% rename from src/DD_tools/main/filters.py rename to src/TreeOfLife_toolbox/main/filters.py diff --git a/src/DD_tools/main/main.py b/src/TreeOfLife_toolbox/main/main.py similarity index 100% rename from src/DD_tools/main/main.py rename to src/TreeOfLife_toolbox/main/main.py diff --git a/src/DD_tools/main/registry.py b/src/TreeOfLife_toolbox/main/registry.py similarity index 100% rename from src/DD_tools/main/registry.py rename to src/TreeOfLife_toolbox/main/registry.py diff --git a/src/DD_tools/main/runner.py b/src/TreeOfLife_toolbox/main/runner.py similarity index 100% rename from src/DD_tools/main/runner.py rename to src/TreeOfLife_toolbox/main/runner.py diff --git a/src/DD_tools/main/runners.py b/src/TreeOfLife_toolbox/main/runners.py similarity index 100% rename from src/DD_tools/main/runners.py rename to src/TreeOfLife_toolbox/main/runners.py diff --git a/src/DD_tools/main/scheduler.py b/src/TreeOfLife_toolbox/main/scheduler.py similarity index 100% rename from src/DD_tools/main/scheduler.py rename to src/TreeOfLife_toolbox/main/scheduler.py diff --git a/src/DD_tools/main/schedulers.py b/src/TreeOfLife_toolbox/main/schedulers.py similarity index 100% rename from src/DD_tools/main/schedulers.py rename to src/TreeOfLife_toolbox/main/schedulers.py diff --git a/src/DD_tools/main/utils.py b/src/TreeOfLife_toolbox/main/utils.py similarity index 100% rename from src/DD_tools/main/utils.py rename to src/TreeOfLife_toolbox/main/utils.py diff --git a/src/DD_tools/main/verification.py b/src/TreeOfLife_toolbox/main/verification.py similarity index 100% rename from src/DD_tools/main/verification.py rename to src/TreeOfLife_toolbox/main/verification.py From 4a982dd82b03ab2bd8e9a8fc1a7ea270e51b6f53 Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Mon, 12 May 2025 02:34:30 -0400 Subject: [PATCH 3/5] Refactor import paths to use TreeOfLife_toolbox module. Updated all import statements to reference TreeOfLife_toolbox instead of DD_tools for consistency and clarity. Adjusted slurm scripts to align with the new module structure and standardized environment variables for toolbox path configuration. --- scripts/tools_filter.slurm | 3 +-- scripts/tools_scheduler.slurm | 3 +-- scripts/tools_verifier.slurm | 3 +-- scripts/tools_worker.slurm | 3 +-- src/TreeOfLife_toolbox/main/filter.py | 8 ++++---- src/TreeOfLife_toolbox/main/filters.py | 8 ++++---- src/TreeOfLife_toolbox/main/main.py | 10 ++++++---- src/TreeOfLife_toolbox/main/registry.py | 4 ++-- src/TreeOfLife_toolbox/main/runner.py | 8 ++++---- src/TreeOfLife_toolbox/main/runners.py | 4 ++-- src/TreeOfLife_toolbox/main/scheduler.py | 8 ++++---- src/TreeOfLife_toolbox/main/schedulers.py | 4 ++-- src/TreeOfLife_toolbox/main/verification.py | 10 +++++----- 13 files changed, 37 insertions(+), 39 deletions(-) diff --git a/scripts/tools_filter.slurm b/scripts/tools_filter.slurm index 4642e34..6aee3f6 100644 --- a/scripts/tools_filter.slurm +++ b/scripts/tools_filter.slurm @@ -19,11 +19,10 @@ executor_memory="64G" module load spark/3.4.1 module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" pbs-spark-submit \ --driver-memory $driver_memory \ --executor-memory $executor_memory \ - "${REPO_ROOT}/src/distributed_downloader/tools/filter.py" \ + "${TOOLBOX_PATH}/main/filter.py" \ "${tool_name}" \ > "${logs_dir}/tool_filter.log" diff --git a/scripts/tools_scheduler.slurm b/scripts/tools_scheduler.slurm index e4fb6a2..ea35a32 100644 --- a/scripts/tools_scheduler.slurm +++ b/scripts/tools_scheduler.slurm @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" export PYARROW_IGNORE_TIMEZONE=1 export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" srun \ --mpi=pmi2 \ @@ -28,4 +27,4 @@ srun \ --cpus-per-task=1 \ --mem=0 \ --output="${logs_dir}/tool_scheduler.log" \ - python "${REPO_ROOT}/src/distributed_downloader/tools/scheduler.py" "${tool_name}" + python "${TOOLBOX_PATH}/main/scheduler.py" "${tool_name}" diff --git a/scripts/tools_verifier.slurm b/scripts/tools_verifier.slurm index 98ca024..6a3b75e 100644 --- a/scripts/tools_verifier.slurm +++ b/scripts/tools_verifier.slurm @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" export PYARROW_IGNORE_TIMEZONE=1 export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" srun \ --mpi=pmi2 \ @@ -28,4 +27,4 @@ srun \ --cpus-per-task=1 \ --mem=0 \ --output="${logs_dir}/tool_verifier.log" \ - python "${REPO_ROOT}/src/distributed_downloader/tools/verification.py" "${tool_name}" + python "${TOOLBOX_PATH}/main/verification.py" "${tool_name}" diff --git a/scripts/tools_worker.slurm b/scripts/tools_worker.slurm index 2ee2662..4856e62 100644 --- a/scripts/tools_worker.slurm +++ b/scripts/tools_worker.slurm @@ -19,7 +19,6 @@ module load miniconda3/23.3.1-py310 source "${REPO_ROOT}/.venv/bin/activate" export PYARROW_IGNORE_TIMEZONE=1 export I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 -export PYTHONPATH=${PYTHONPATH}:"${REPO_ROOT}/src":"${REPO_ROOT}/distributed-downloader" srun \ --mpi=pmi2 \ @@ -28,4 +27,4 @@ srun \ --cpus-per-task="$TOOLS_CPU_PER_WORKER" \ --mem=0 \ --output="${logs_dir}/tool_worker-%2t.log" \ - python "${REPO_ROOT}/src/distributed_downloader/tools/runner.py" "${tool_name}" + python "${TOOLBOX_PATH}/main/runner.py" "${tool_name}" diff --git a/src/TreeOfLife_toolbox/main/filter.py b/src/TreeOfLife_toolbox/main/filter.py index 080e1a2..ed526c5 100644 --- a/src/TreeOfLife_toolbox/main/filter.py +++ b/src/TreeOfLife_toolbox/main/filter.py @@ -1,10 +1,10 @@ import argparse import os -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") diff --git a/src/TreeOfLife_toolbox/main/filters.py b/src/TreeOfLife_toolbox/main/filters.py index 11c9426..385f18e 100644 --- a/src/TreeOfLife_toolbox/main/filters.py +++ b/src/TreeOfLife_toolbox/main/filters.py @@ -7,10 +7,10 @@ from pyspark.sql import SparkSession from pyspark.sql.types import StructType -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsBase -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import SuccessEntry +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsBase +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import SuccessEntry FilterRegister = partial(ToolsRegistryBase.register, "filter") diff --git a/src/TreeOfLife_toolbox/main/main.py b/src/TreeOfLife_toolbox/main/main.py index b3d5732..5272354 100644 --- a/src/TreeOfLife_toolbox/main/main.py +++ b/src/TreeOfLife_toolbox/main/main.py @@ -1,15 +1,16 @@ import argparse import os from logging import Logger +from pathlib import Path from typing import Dict, List, Optional, TextIO, Tuple import pandas as pd from attr import Factory, define, field -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import ( +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import ( init_logger, ensure_created, truncate_paths, @@ -78,6 +79,7 @@ def __attrs_post_init__(self): def __init_environment(self) -> None: os.environ["CONFIG_PATH"] = self.config.config_path + os.environ["TOOLBOX_PATH"] = str(Path(__file__).parent.parent.resolve()) os.environ["ACCOUNT"] = self.config["account"] os.environ["PATH_TO_INPUT"] = self.config["path_to_input"] diff --git a/src/TreeOfLife_toolbox/main/registry.py b/src/TreeOfLife_toolbox/main/registry.py index 12774dd..03cf9d6 100644 --- a/src/TreeOfLife_toolbox/main/registry.py +++ b/src/TreeOfLife_toolbox/main/registry.py @@ -1,7 +1,7 @@ from typing import Dict, Type, Optional -from DD_tools.main.config import Config -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.utils import init_logger class ToolsRegistryBase(type): diff --git a/src/TreeOfLife_toolbox/main/runner.py b/src/TreeOfLife_toolbox/main/runner.py index 214237e..77dcefa 100644 --- a/src/TreeOfLife_toolbox/main/runner.py +++ b/src/TreeOfLife_toolbox/main/runner.py @@ -1,10 +1,10 @@ import argparse import os -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") diff --git a/src/TreeOfLife_toolbox/main/runners.py b/src/TreeOfLife_toolbox/main/runners.py index cd875d3..bfb5d5e 100644 --- a/src/TreeOfLife_toolbox/main/runners.py +++ b/src/TreeOfLife_toolbox/main/runners.py @@ -6,8 +6,8 @@ import pandas as pd -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsBase, ToolsRegistryBase +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsBase, ToolsRegistryBase RunnerRegister = partial(ToolsRegistryBase.register, "runner") diff --git a/src/TreeOfLife_toolbox/main/scheduler.py b/src/TreeOfLife_toolbox/main/scheduler.py index 707b656..d686ae6 100644 --- a/src/TreeOfLife_toolbox/main/scheduler.py +++ b/src/TreeOfLife_toolbox/main/scheduler.py @@ -1,10 +1,10 @@ import argparse import os -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") diff --git a/src/TreeOfLife_toolbox/main/schedulers.py b/src/TreeOfLife_toolbox/main/schedulers.py index ed70a9c..6b2c6e2 100644 --- a/src/TreeOfLife_toolbox/main/schedulers.py +++ b/src/TreeOfLife_toolbox/main/schedulers.py @@ -5,8 +5,8 @@ import pandas as pd -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsBase, ToolsRegistryBase +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsBase, ToolsRegistryBase SchedulerRegister = partial(ToolsRegistryBase.register, "scheduler") diff --git a/src/TreeOfLife_toolbox/main/verification.py b/src/TreeOfLife_toolbox/main/verification.py index 742bb86..31d2561 100644 --- a/src/TreeOfLife_toolbox/main/verification.py +++ b/src/TreeOfLife_toolbox/main/verification.py @@ -3,11 +3,11 @@ import pandas as pd -from DD_tools.main.checkpoint import Checkpoint -from DD_tools.main.config import Config -from DD_tools.main.registry import ToolsRegistryBase -from DD_tools.main.runners import MPIRunnerTool -from DD_tools.main.utils import init_logger +from TreeOfLife_toolbox.main.checkpoint import Checkpoint +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.registry import ToolsRegistryBase +from TreeOfLife_toolbox.main.runners import MPIRunnerTool +from TreeOfLife_toolbox.main.utils import init_logger if __name__ == "__main__": config_path = os.environ.get("CONFIG_PATH") From 99ca3b016b14c27482d7152bba47d6db71b46334 Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Mon, 12 May 2025 02:39:08 -0400 Subject: [PATCH 4/5] Update metadata and dependencies in pyproject.toml Revised the project description, added programming language classifiers, and enhanced optional dependencies with 'ruff'. Introduced new keywords and added a script entry point for better usability. --- pyproject.toml | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cb76174..b3e3a5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,11 +13,14 @@ authors = [ { name = "Elizabeth G. Campolongo", email = "e.campolongo479@gmail.com" }, { name = "Matthew J. Thompson", email = "thompson.m.j@outlook.com" }, ] -description = "A tool for downloading files from a list of URLs in parallel." +description = "A tool for processing datasets that was downloaded using the distributed-downloader package." readme = "README.md" requires-python = ">=3.10, <3.12" classifiers = [ + "Development Status :: 4 - Beta", "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ] @@ -52,13 +55,17 @@ dependencies = [ ] [project.optional-dependencies] -dev = ["pytest"] +dev = [ + "pytest", + "ruff" +] keywords = [ "parallel", "distributed", - "download", "url", + "mpi-applications", + "dataset-generation", ] [project.urls] @@ -66,5 +73,8 @@ Homepage = "https://github.com/Imageomics/distributed-downloader" Repository = "https://github.com/Imageomics/distributed-downloader.git" "Bug Tracker" = "https://github.com/Imageomics/distributed-downloader/issues" +[project.scripts] +tree_of_life_toolbox = "TreeOfLife_toolbox.main.main:main" + [tool.hatch.version] path = "src/TreeOfLife_toolbox/main/__about__.py" From 5e64e60cdc4688c6cb2f97fb7d5d352d5d8648a1 Mon Sep 17 00:00:00 2001 From: Andrey170170 Date: Tue, 13 May 2025 01:49:49 -0400 Subject: [PATCH 5/5] Refactor FathomNet crop fix tool and update toolbox structure Moved `fathom_net_crop_fix` module from `DD_tools` to `TreeOfLife_toolbox`, updating imports and paths for consistency. Enhanced documentation, improved cropping logic, and incorporated boundary checks to ensure accurate image processing. --- src/DD_tools/fathom_net_crop_fix/README.md | 8 - src/DD_tools/fathom_net_crop_fix/__init__.py | 0 src/TreeOfLife_toolbox/__init__.py | 1 + .../fathom_net_crop_fix/README.md | 81 ++++++++++ .../fathom_net_crop_fix/__init__.py | 5 + .../fathom_net_crop_fix/classes.py | 145 ++++++++++++++---- 6 files changed, 205 insertions(+), 35 deletions(-) delete mode 100644 src/DD_tools/fathom_net_crop_fix/README.md delete mode 100644 src/DD_tools/fathom_net_crop_fix/__init__.py create mode 100644 src/TreeOfLife_toolbox/fathom_net_crop_fix/README.md create mode 100644 src/TreeOfLife_toolbox/fathom_net_crop_fix/__init__.py rename src/{DD_tools => TreeOfLife_toolbox}/fathom_net_crop_fix/classes.py (51%) diff --git a/src/DD_tools/fathom_net_crop_fix/README.md b/src/DD_tools/fathom_net_crop_fix/README.md deleted file mode 100644 index 7da7009..0000000 --- a/src/DD_tools/fathom_net_crop_fix/README.md +++ /dev/null @@ -1,8 +0,0 @@ -tool to fix fathom net cropped images (crop again from original with updated algorithm) - -Additional config fields: - -* `uuid_table_path` - path to uuid table to filter out -* `look_up_table_path` - path to look up table with `uuid - file_name` information -* `filtered_by_size` - original csv that was used for cropping (contains `uuid` matches and crop coordinates) -* `data_transfer_table` - csv that contains match between ToL dataset file and original image \ No newline at end of file diff --git a/src/DD_tools/fathom_net_crop_fix/__init__.py b/src/DD_tools/fathom_net_crop_fix/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/TreeOfLife_toolbox/__init__.py b/src/TreeOfLife_toolbox/__init__.py index e69de29..a4de23e 100644 --- a/src/TreeOfLife_toolbox/__init__.py +++ b/src/TreeOfLife_toolbox/__init__.py @@ -0,0 +1 @@ +from TreeOfLife_toolbox import fathom_net_crop_fix diff --git a/src/TreeOfLife_toolbox/fathom_net_crop_fix/README.md b/src/TreeOfLife_toolbox/fathom_net_crop_fix/README.md new file mode 100644 index 0000000..8ecc11e --- /dev/null +++ b/src/TreeOfLife_toolbox/fathom_net_crop_fix/README.md @@ -0,0 +1,81 @@ +# FathomNet Crop Fix Tool + +## Overview + +This tool corrects improperly cropped FathomNet images by reprocessing them from the original source images using +improved cropping algorithms. It addresses boundary issues in the previous cropping implementation by enforcing proper +bounds checking and preventing out-of-bounds errors. + +The tool follows a three-stage processing pipeline: + +1. **Filter Stage**: Identifies affected images by joining UUID tables with lookup information +2. **Scheduler Stage**: Organizes processing by server to enable efficient parallel execution +3. **Runner Stage**: Performs the actual image recropping using correct boundary parameters + +> ⚠️ **Note**: This is a specialized tool built for a specific dataset issue. It should not be used for other cases +> without code modifications. + +## Configuration Requirements + +The following fields must be defined in your configuration file: + +| Field | Description | +|----------------------------|-----------------------------------------------------------------------------| +| `uuid_table_path` | Path to CSV/parquet with UUIDs of images needing recropping | +| `look_up_table_path` | Path to lookup table with `uuid` to `file_name` mapping information | +| `filtered_by_size` | Path to original CSV containing bounding box coordinates and UUID matches | +| `data_transfer_table` | Path to CSV mapping ToL dataset files to original image locations | +| `base_path` | Base directory where images were transferred using the `data_transfer` tool | +| `original_image_base_path` | Base directory where original uncropped images are stored | +| `image_crop_path` | Output directory where corrected cropped images will be saved | + +## Pre-Conditions + +For the tool to work correctly, the following conditions must be met: + +- Original uncropped images still exist and are accessible +- Original images have not been modified since initial cropping +- Initial cropping was performed using the `fathom_net_crop` tool +- Images were transferred and restructured using the `data_transfer` tool +- Transfer logs are available to provide mapping between new filenames and original files +- The provided `filtered_by_size` CSV contains valid bounding box information (x, y, width, height) + +## Processing Details + +The tool applies the following corrections to each image: + +- Ensures crop boundaries stay within the original image dimensions +- Applies proper bounds checking to prevent negative coordinates +- Ensures maximum bounds do not exceed image dimensions +- Recalculates image hashes for the properly cropped images +- Preserves all original metadata while updating size information + +## Post-Conditions + +After successful execution: + +- Corrected cropped images are saved to the `image_crop_path` directory +- Images are organized in a server-based directory structure +- Each output file contains properly cropped images with corrected dimensions +- Each cropped image maintains its original UUID and source identification +- New hashsum values are calculated for the corrected images +- Verification data is created to track processing completion + +## Usage Notes + +The tool is designed to run in a distributed environment using MPI. It handles processing in batches by server to +maximize efficiency and manages timeouts to ensure job completion within allocation constraints. + +**Technical Implementation**: The core fix applies proper boundary checking to ensure crop coordinates are within valid +image dimensions: + +```python +# Calculate corrected crop coordinates with proper bounds checking +min_y = min(image_size[0], max(row["y"], 0)) +min_x = min(image_size[1], max(row["x"], 0)) +max_y = min(image_size[0], max(row["y"] + row["height"], 0)) +max_x = min(image_size[1], max(row["x"] + row["width"], 0)) +``` + +This prevents both negative coordinates and exceeding image dimensions, which were the main causes of errors in the +original cropping implementation. \ No newline at end of file diff --git a/src/TreeOfLife_toolbox/fathom_net_crop_fix/__init__.py b/src/TreeOfLife_toolbox/fathom_net_crop_fix/__init__.py new file mode 100644 index 0000000..8c6d79d --- /dev/null +++ b/src/TreeOfLife_toolbox/fathom_net_crop_fix/__init__.py @@ -0,0 +1,5 @@ +from .classes import ( + FathomnetCropFixFilter, + FathomnetCropFixScheduleCreation, + FathomnetCropFixRunner, +) diff --git a/src/DD_tools/fathom_net_crop_fix/classes.py b/src/TreeOfLife_toolbox/fathom_net_crop_fix/classes.py similarity index 51% rename from src/DD_tools/fathom_net_crop_fix/classes.py rename to src/TreeOfLife_toolbox/fathom_net_crop_fix/classes.py index 8c9d9bd..c5000a0 100644 --- a/src/DD_tools/fathom_net_crop_fix/classes.py +++ b/src/TreeOfLife_toolbox/fathom_net_crop_fix/classes.py @@ -7,23 +7,44 @@ import pandas as pd import pyspark.sql.functions as func -from DD_tools.main.config import Config -from DD_tools.main.filters import FilterRegister, SparkFilterToolBase -from DD_tools.main.runners import MPIRunnerTool, RunnerRegister -from DD_tools.main.schedulers import DefaultScheduler, SchedulerRegister -from DD_tools.main.utils import load_dataframe +from TreeOfLife_toolbox.main.config import Config +from TreeOfLife_toolbox.main.filters import FilterRegister, SparkFilterToolBase +from TreeOfLife_toolbox.main.runners import MPIRunnerTool, RunnerRegister +from TreeOfLife_toolbox.main.schedulers import DefaultScheduler, SchedulerRegister +from TreeOfLife_toolbox.main.utils import load_dataframe server_pattern = r"server=([^/]+)" @FilterRegister("fathom_net_crop_fix") class FathomnetCropFixFilter(SparkFilterToolBase): + """ + Filter to prepare data for the FathomNet crop fix operation. + + This class loads UUID tables with incorrectly cropped images and lookup tables, then joins them to create + a mapping between UUIDs and their corresponding file paths. The resulting + data is partitioned by server for distributed processing. + """ + def __init__(self, cfg: Config): + """ + Initialize the FathomNet crop fix filter with configuration. + + Args: + cfg (Config): Configuration object with paths and settings. + """ super().__init__(cfg) self.filter_name: str = "fathom_net_crop_fix" def run(self): + """ + Run the filter to create a table of images that need to be reprocessed. + + Loads the UUID table with incorrectly cropped images and lookup table, joins them to match UUIDs with + file paths, extracts server information from paths, and saves the result + as a CSV in the filter table directory. + """ uuid_table_df = load_dataframe( self.spark, self.config["uuid_table_path"] ).repartition(20) @@ -46,36 +67,79 @@ def run(self): @SchedulerRegister("fathom_net_crop_fix") class FathomnetCropFixScheduleCreation(DefaultScheduler): + """ + Scheduler for FathomNet crop fix operations. + + Creates a schedule for distributed processing by server, ensuring + all images from the same server are processed together. + """ + def __init__(self, cfg: Config): + """ + Initialize the scheduler with configuration. + + Args: + cfg (Config): Configuration object with settings. + """ super().__init__(cfg) self.filter_name: str = "fathom_net_crop_fix" - self.scheme = ["server"] + self.scheme = ["server"] # Group tasks by server @RunnerRegister("fathom_net_crop_fix") class FathomnetCropFixRunner(MPIRunnerTool): + """ + Runner for FathomNet crop fix operations. + + This class implements the actual cropping correction logic. It loads + original images, applies the correct cropping parameters, and saves + the properly cropped images to the specified output location. + """ + def __init__(self, cfg: Config): + """ + Initialize the runner with configuration and load necessary data tables. + + Args: + cfg (Config): Configuration object with paths and settings. + """ super().__init__(cfg) self.filter_name: str = "fathom_net_crop_fix" self.data_scheme: List[str] = ["uuid", "server", "path"] self.verification_scheme: List[str] = ["server"] - self.total_time = 150 + self.total_time = 150 # Time buffer for timeout checking in seconds + # Load reference dataframes self.data_transfer_df = pd.read_csv(cfg["data_transfer_table"]) - self.bb_df = pd.read_csv(cfg["filtered_by_size"]) - self.image_crop_path = os.path.join( - cfg.get_folder("path_to_output_folder"), "image_crop" - ) - self.base_path = ( - "/fs/scratch/PAS2136/gbif/processed/fathomNet/images_full/source=fathomNet" - ) - self.original_image_base_path = ( - "/fs/scratch/PAS2136/gbif/processed/fathomNet/images_full/downloaded_images" - ) + self.bb_df = pd.read_csv(cfg["filtered_by_size"]) # Bounding box information + + # Set path configurations + self.image_crop_path = cfg["image_crop_path"] # Output path for corrected crops + self.base_path = cfg["base_path"] # Base path for current dataset + self.original_image_base_path = cfg[ + "original_image_base_path" + ] # Path to original uncropped images def apply_filter(self, filtering_df: pd.DataFrame, server_name: str) -> int: + """ + Apply the cropping fix to images from a specific server. + + This function: + 1. Finds the matching UUIDs that need fixing + 2. For each path, loads the original uncropped image + 3. Applies the correct cropping parameters + 4. Saves the properly cropped images + + Args: + filtering_df (pd.DataFrame): DataFrame with UUIDs and paths to process + server_name (str): Name of the server being processed + + Returns: + int: Number of images successfully processed + """ self.is_enough_time() + # Find matching UUIDs that need cropping fixes uuids_df = self.bb_df.merge( filtering_df[["uuid"]], left_on="tol_uuid", @@ -88,27 +152,32 @@ def apply_filter(self, filtering_df: pd.DataFrame, server_name: str) -> int: for full_path, images_df in filtering_df.groupby("path"): assert isinstance(full_path, str), "Not a string" + # Find the original image path using the data transfer mapping table file_name = os.path.basename(full_path) original_image_path = ( - self.original_image_base_path - + self.data_transfer_df[ - self.data_transfer_df["dst_path"] - == os.path.join(self.base_path, f"server={server_name}", file_name) - ].iloc[0]["src_path"][67:] + self.original_image_base_path + + self.data_transfer_df[ + self.data_transfer_df["dst_path"] + == os.path.join(self.base_path, f"server={server_name}", file_name) + ].iloc[0]["src_path"][ + 67: + ] # Specific to our dataset, will need to be changed in yours ) if not os.path.exists(original_image_path): self.logger.info(f"Path doesn't exists: {original_image_path}") return 0 + # Load the original full-sized images that need fixing full_image = pd.read_parquet( original_image_path, filters=[("source_id", "in", uuids_df["image_uuid"])], ) - self.is_enough_time() + self.is_enough_time() # Check if we still have enough time to continue columns = full_image.columns + # Merge to get bounding box information for each image full_image = full_image.merge( self.bb_df, left_on="source_id", @@ -118,21 +187,26 @@ def apply_filter(self, filtering_df: pd.DataFrame, server_name: str) -> int: ) for _, row in full_image.iterrows(): + # Create a new entry for the fixed cropped image cropped_entry = row[columns].to_dict() image_binary = row["image"] image_size = row["resized_size"] + + # Convert binary image data to numpy array image_np = np.frombuffer(image_binary, dtype=np.uint8).reshape( [image_size[0], image_size[1], 3] ) - # fix + # Calculate corrected crop coordinates with proper bounds checking min_y = min(image_size[0], max(row["y"], 0)) min_x = min(image_size[1], max(row["x"], 0)) max_y = min(image_size[0], max(row["y"] + row["height"], 0)) max_x = min(image_size[1], max(row["x"] + row["width"], 0)) + # Crop the image with corrected coordinates image_cropped = image_np[min_y:max_y, min_x:max_x] + # Update entry with the new cropped image data cropped_entry["image"] = image_cropped.tobytes() cropped_entry["resized_size"] = (max_y - min_y, max_x - min_x) cropped_entry["hashsum_resized"] = hashlib.md5( @@ -141,18 +215,22 @@ def apply_filter(self, filtering_df: pd.DataFrame, server_name: str) -> int: cropped_entry["uuid"] = row["tol_uuid"] cropped_entry["source_id"] = row["bb_uuid"] + # Verify the image data size matches dimensions assert len(cropped_entry["image"]) == ( - cropped_entry["resized_size"][0] - * cropped_entry["resized_size"][1] - * 3 + cropped_entry["resized_size"][0] + * cropped_entry["resized_size"][1] + * 3 ), f"Size mismatch for {row['tol_uuid']}" cropped_images.append(cropped_entry) self.is_enough_time() + # Create a DataFrame from all processed images cropped_image = pd.DataFrame(cropped_images) output_path = os.path.join(self.image_crop_path, f"server={server_name}") os.makedirs(output_path, exist_ok=True) + + # Save the corrected images with a unique filename cropped_image.to_parquet( os.path.join(output_path, f"data_{uuid.uuid4()}.parquet"), index=False, @@ -163,6 +241,18 @@ def apply_filter(self, filtering_df: pd.DataFrame, server_name: str) -> int: return len(cropped_image) def runner_fn(self, df_local: pd.DataFrame) -> int: + """ + Process function called for each batch of data in the distributed execution. + + Handles the execution of the crop fix operation for a specific server, + with error handling and reporting. + + Args: + df_local (pd.DataFrame): Local partition of the data to process + + Returns: + int: 1 if successful, 0 if failed + """ filtering_df = df_local.reset_index(drop=True) server_name = filtering_df.iloc[0]["server"] try: @@ -174,6 +264,7 @@ def runner_fn(self, df_local: pd.DataFrame) -> int: self.logger.error(f"Error occurred: {e}") return 0 else: + # Log successful completion for verification print(f"{server_name}", end="\n", file=self.verification_IO) self.logger.debug( f"Completed filtering: {server_name} with {filtered_parquet_length}"