From f27a6e00e0462bac7b01c8191042c6491830ed71 Mon Sep 17 00:00:00 2001 From: Kyle Hippe Date: Mon, 14 Oct 2024 20:09:19 +0000 Subject: [PATCH 1/2] Adding more gitignore --- .gitignore | 166 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..894e3e2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,166 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + + +# Other ignores +cache/ \ No newline at end of file From 270402ec0f4df709ce7a535e30b34d247fa362cf Mon Sep 17 00:00:00 2001 From: Kyle Hippe Date: Mon, 14 Oct 2024 20:09:50 +0000 Subject: [PATCH 2/2] Minimal changes for parsl minhash computing --- deduplication/__main__.py | 137 ++++++++++++++--- deduplication/args.py | 256 +++++++++++++++++++------------ deduplication/lshbloom.py | 29 +++- deduplication/minhash.py | 213 ++++++++++++++------------ deduplication/parsl_conf.py | 290 ++++++++++++++++++++++++++++++++++++ deduplication/workflows.py | 160 ++++++++++++++++---- 6 files changed, 830 insertions(+), 255 deletions(-) create mode 100644 deduplication/parsl_conf.py diff --git a/deduplication/__main__.py b/deduplication/__main__.py index ee61c36..ca02cf2 100644 --- a/deduplication/__main__.py +++ b/deduplication/__main__.py @@ -1,25 +1,118 @@ -from deduplication.workflows import * from deduplication.args import parse_args +from deduplication.parsl_conf import get_compute_settings +from deduplication.workflows import ( + dedup_multi_bloom, + dedup_multi_lsh, + dedup_single_bloom, + dedup_single_file_bloom, + dedup_single_file_lsh, + dedup_single_lsh, +) -args = parse_args() - -if args.mode == "bloom": - if args.single: - assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" - dedup_single_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing) - elif args.multi: - dedup_multi_bloom(args.input, args.minhash_dir, args.num, args.fp, args.output_file, args.name, args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing) - else: - assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" - dedup_single_file_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing) -else: - if args.single: - assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" - dedup_single_lsh(args.input[0], args.minhash_dir[0], args.output_file, args.name[0], args.sim_threshold, args.num_perm, redis_port=args.redis_port, compute_minhashes=not args.skip_minhashing) - elif args.multi: - dedup_multi_lsh(args.input, args.minhash_dir, args.output_file, args.name, args.sim_threshold, args.num_perm, redis_port=args.redis_port, compute_minhashes=not args.skip_minhashing) - else: - assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" - dedup_single_file_lsh(args.input[0], args.minhash_dir[0], args.output_file, args.name[0], args.sim_threshold, args.num_perm, redis_port=args.redis_port, compute_minhashes=not args.skip_minhashing) - +if __name__ == "__main__": + args = parse_args() + # Setup parsl if applicable + parsl_config = None + if args.parsl_config is not None: + parsl_config = get_compute_settings(args) + if args.mode == "bloom": + if args.single: + assert ( + len(args.input) == 1 + and len(args.minhash_dir) == 1 + and len(args.name) == 1 + ), "Expected single input argument but got a list" + dedup_single_bloom( + args.input[0], + args.minhash_dir[0], + args.num, + args.fp, + args.output_file, + args.name[0], + args.sim_threshold, + args.num_perm, + args.save_dir, + not args.skip_minhashing, + args.clear, + parsl_config, + ) + elif args.multi: + dedup_multi_bloom( + args.input, + args.minhash_dir, + args.num, + args.fp, + args.output_file, + args.name, + args.sim_threshold, + args.num_perm, + args.save_dir, + not args.skip_minhashing, + args.clear, + parsl_config, + ) + else: + assert ( + len(args.input) == 1 + and len(args.minhash_dir) == 1 + and len(args.name) == 1 + ), "Expected single input argument but got a list" + dedup_single_file_bloom( + args.input[0], + args.minhash_dir[0], + args.num, + args.fp, + args.output_file, + args.name[0], + args.sim_threshold, + args.num_perm, + args.save_dir, + not args.skip_minhashing, + args.clear, + parsl_config, + ) + else: + if args.single: + assert ( + len(args.input) == 1 + and len(args.minhash_dir) == 1 + and len(args.name) == 1 + ), "Expected single input argument but got a list" + dedup_single_lsh( + args.input[0], + args.minhash_dir[0], + args.output_file, + args.name[0], + args.sim_threshold, + args.num_perm, + redis_port=args.redis_port, + compute_minhashes=not args.skip_minhashing, + ) + elif args.multi: + dedup_multi_lsh( + args.input, + args.minhash_dir, + args.output_file, + args.name, + args.sim_threshold, + args.num_perm, + redis_port=args.redis_port, + compute_minhashes=not args.skip_minhashing, + ) + else: + assert ( + len(args.input) == 1 + and len(args.minhash_dir) == 1 + and len(args.name) == 1 + ), "Expected single input argument but got a list" + dedup_single_file_lsh( + args.input[0], + args.minhash_dir[0], + args.output_file, + args.name[0], + args.sim_threshold, + args.num_perm, + redis_port=args.redis_port, + compute_minhashes=not args.skip_minhashing, + ) diff --git a/deduplication/args.py b/deduplication/args.py index ce7e792..18639bb 100644 --- a/deduplication/args.py +++ b/deduplication/args.py @@ -1,101 +1,165 @@ -import sys import argparse +import sys + # cmd arguments def parse_args(): - cmd_args = " ".join(sys.argv) - parser = argparse.ArgumentParser( - description="CLI Tool for Text Deduplication using MinHashLSH", - formatter_class=argparse.RawTextHelpFormatter - ) - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument( - "--single", - action="store_true", - help="Deduplicate a single corpus against the index", - ) - group.add_argument( - "--multi", - action="store_true", - help="Deduplicate multiple corpora against the index", - ) - group.add_argument( - "--file", - action="store_true", - help="Deduplicate a single JSONL file (may have multiple documents) against the index", - ) - parser.add_argument( - "--name", - help="Name(s) of corpus we are deduplicating", - required=True, - nargs="+", - ) - parser.add_argument( - "--input", - help=" Directory or directories where jsonl data is stored\n JSONL file to deduplicate", - required=True, - nargs="+", - ) - parser.add_argument( - "--minhash-dir", - help="Output directory where pickled minhash signatures will be stored", - required=True, - nargs="+", - ) - parser.add_argument( - "--output-file", - help="Path to csv file where duplicates will be logged", - required=True, - ) - parser.add_argument( - "--sim-threshold", - help="Jaccard Similarity threshold for deduplication, should be in [0, 1]. Default is 0.8", - default=0.8, - ) - parser.add_argument( - "--num-perm", - help="Number of hash functions for MinHashing. Default is 128", - default=128, - ) - parser.add_argument( - "--mode", - default="bloom", - choices=["lsh", "bloom"], - help="Whether to use classic MinHashLSH or LSHBloom, default is LSHBloom", - ) - parser.add_argument( - "--save-dir", - help=" Directory where Bloom Index will be stored", - required=("--mode lsh" not in cmd_args) - ) - parser.add_argument( - "-n", - "--num", - type=int, - help=" Total size of text dataset in number of documents", - required=("--mode lsh" not in cmd_args), - ) - parser.add_argument( - "--fp", - type=float, - help=" False Positive rate for Bloom Filter, should be in [0,1]. Default is 0.001 (0.1%%)", - default=0.001, - ) - parser.add_argument( - "--clear", - help=" If set, will remove the bloom filter index in save-dir as well as any results csv and start from scratch (Warning: this can not be undone)", - action="store_true" - ) - parser.add_argument( - "--redis_port", - help=" The port that Redis server is listening on. Default is 6379", - type=int, - default=6379, - ) - parser.add_argument( - "--skip-minhashing", - help="If set, will skip the minhashing step of each workflow (useful if minhashes have been precomputed at minhash_dir)", - action="store_true" - ) + cmd_args = " ".join(sys.argv) + parser = argparse.ArgumentParser( + description="CLI Tool for Text Deduplication using MinHashLSH", + formatter_class=argparse.RawTextHelpFormatter, + ) + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "--single", + action="store_true", + help="Deduplicate a single corpus against the index", + ) + group.add_argument( + "--multi", + action="store_true", + help="Deduplicate multiple corpora against the index", + ) + group.add_argument( + "--file", + action="store_true", + help="Deduplicate a single JSONL file (may have multiple documents) against the index", + ) + parser.add_argument( + "--name", + help="Name(s) of corpus we are deduplicating", + required=True, + nargs="+", + ) + parser.add_argument( + "--input", + help=" Directory or directories where jsonl data is stored\n JSONL file to deduplicate", + required=True, + nargs="+", + ) + parser.add_argument( + "--minhash-dir", + help="Output directory where pickled minhash signatures will be stored", + required=True, + nargs="+", + ) + parser.add_argument( + "--output-file", + help="Path to csv file where duplicates will be logged", + required=True, + ) + parser.add_argument( + "--sim-threshold", + help="Jaccard Similarity threshold for deduplication, should be in [0, 1]. Default is 0.8", + default=0.8, + type=float, + ) + parser.add_argument( + "--num-perm", + help="Number of hash functions for MinHashing. Default is 128", + default=128, + type=int, + ) + parser.add_argument( + "--mode", + default="bloom", + choices=["lsh", "bloom"], + help="Whether to use classic MinHashLSH or LSHBloom, default is LSHBloom", + ) + parser.add_argument( + "--save-dir", + help=" Directory where Bloom Index will be stored", + required=("--mode lsh" not in cmd_args), + ) + parser.add_argument( + "-n", + "--num", + type=int, + help=" Total size of text dataset in number of documents", + required=("--mode lsh" not in cmd_args), + ) + parser.add_argument( + "--fp", + type=float, + help=" False Positive rate for Bloom Filter, should be in [0,1]. Default is 0.001 (0.1%%)", + default=0.001, + ) + parser.add_argument( + "--clear", + help=" If set, will remove the bloom filter index in save-dir as well as any results csv and start from scratch (Warning: this can not be undone)", + action="store_true", + ) + parser.add_argument( + "--redis_port", + help=" The port that Redis server is listening on. Default is 6379", + type=int, + default=6379, + ) + parser.add_argument( + "--skip-minhashing", + help="If set, will skip the minhashing step of each workflow (useful if minhashes have been precomputed at minhash_dir)", + action="store_true", + ) + + # Add Parsl argument group + parsl_group = parser.add_argument_group( + "parsl_settings", "Parsl settings for distributed computing." + ) + parsl_group.add_argument( + "--parsl-config", + choices=["local", "workstation", "polaris"], + type=str, + ) + parsl_group.add_argument( + "--num-nodes", + type=int, + help="Number of nodes to request. Default is 1.", + default=1, + ) + parsl_group.add_argument( + "--worker-init", + type=str, + help="Command to initialize the worker, e.g., loading modules or environments.", + default="", + ) + parsl_group.add_argument( + "--scheduler-options", + type=str, + help="PBS directives, e.g., '-l filesystems=home:eagle:grand'.", + ) + parsl_group.add_argument( + "--account", type=str, help="Account to charge compute to." + ) + parsl_group.add_argument( + "--queue", type=str, help="Queue to submit jobs to, typically 'prod'." + ) + parsl_group.add_argument("--walltime", type=str, help="Maximum job time.") + parsl_group.add_argument( + "--cpus-per-node", + type=int, + help="Number of CPUs per node. Default is 64.", + default=64, + ) + parsl_group.add_argument( + "--cores-per-worker", + type=int, + help="Number of cores per worker. Default is 4.", + default=4, + ) + parsl_group.add_argument( + "--retries", + type=int, + help="Number of retries upon failure. Default is 0.", + default=0, + ) + parsl_group.add_argument( + "--worker-debug", action="store_true", help="Enable worker debug mode." + ) + parsl_group.add_argument( + "--log-dir", + type=str, + help="Directory in which to store Parsl run files.", + default="./parsl", + ) - return parser.parse_args() + return parser.parse_args() diff --git a/deduplication/lshbloom.py b/deduplication/lshbloom.py index 7a0b5d6..904092d 100644 --- a/deduplication/lshbloom.py +++ b/deduplication/lshbloom.py @@ -1,10 +1,23 @@ -from tqdm.autonotebook import tqdm -from multiprocessing import Pool -from datasketch import MinHashLSHBloom -from typing import List, Tuple, Dict -from functools import partial -import pickle import os +import pickle +from typing import Any, Dict, List, Tuple + +from datasketch import MinHashLSHBloom +from tqdm.autonotebook import tqdm + + +def run_LSHBloom( + minhash_dir: str, lsh_params: dict[str, Any], csvfile: str, corpus_name: str +) -> None: + """Run the LSHBloom deduplication process on a corpus of documents.""" + # TODO: this is a bit of an overreach, need to think about how to not propogate + # duplicates back to pilot if running with parsl + from deduplication.writers import write_duplicates_to_csv + + index = LSHBloom(minhash_dir, lsh_params) + duplicates = index.deduplicate_corpus() + write_duplicates_to_csv(duplicates, csvfile, corpus_name, header=["dup_key"]) + class LSHBloom: """ @@ -14,13 +27,14 @@ class LSHBloom: ``` # minhash signatures are computed or already exist in minhashdir m = MinHasher(indir, minhashdir) - m.process() + m.process() lsh_params = {...} index = LSHBloom(minhashdir, lsh_params) index.deduplicate_corpus() # creates index and stores based on lsh_params ``` """ + def __init__(self, minhash_dir: str, lsh_params: Dict): """ minhash_dir: path to directory of pickled minhash signatures @@ -97,4 +111,3 @@ def deduplicate_minhash_file(self, minhashfile: str) -> List[Tuple[str]]: pbar.update() return duplicate_list - diff --git a/deduplication/minhash.py b/deduplication/minhash.py index f905a0c..a38bd71 100644 --- a/deduplication/minhash.py +++ b/deduplication/minhash.py @@ -1,106 +1,125 @@ -from tqdm.autonotebook import tqdm -from multiprocessing import Pool -from datasketch import MinHash -from typing import Optional -from glob import glob -import pickle import json -from functools import partial import os +import pickle +from functools import partial +from glob import glob +from multiprocessing import Pool +from typing import Optional + +from datasketch import MinHash +from tqdm.autonotebook import tqdm # TODO check if minhashes already exist, recompute only if forced + def compute_minhash_jsonl(t, fname, num_perm): - lineNo, line = t - lineNo += 1 - line = json.loads(line) - line = line.get("text", "") - s = set(line.split()) - if not s: - return None - m = MinHash(num_perm=num_perm) - for d in s: - m.update(d.encode("utf8")) - # generate a unique key for this document - key = f"{fname}-{lineNo}" - return (key, m) - -def compute_minhash_for_file(infile: str, output_dir: str, num_perm: int): - """ - Compute minhash signatures for a given jsonl file with the format specified for - 'compute_minhash_jsonl' above. - - infile is the path to the singular jsonl file - will store the minhash signatures in self.output_dir - """ - n = 50000 - fname = infile.split("/")[-1] - with open(infile) as fin, Pool(32) as p, tqdm(total=n, desc=fname) as pbar: - minhash_list = list() - partial_compute_minhash = partial(compute_minhash_jsonl, fname=fname, num_perm=num_perm) - for result in p.imap_unordered(partial_compute_minhash, enumerate(fin)): - # for t in enumerate(fin): - # result = partial_compute_minhash(t) - if result: - minhash_list.append(result) - pbar.update() - with open(f"{output_dir}/{fname[:-6]}.pkl", "wb") as fp: - pickle.dump(minhash_list, fp) - print(f"Generated MinHash for {len(minhash_list):,} documents in {fname}") + lineNo, line = t + lineNo += 1 + line = json.loads(line) + line = line.get("text", "") + s = set(line.split()) + if not s: + return None + m = MinHash(num_perm=num_perm) + for d in s: + m.update(d.encode("utf8")) + # generate a unique key for this document + key = f"{fname}-{lineNo}" + return (key, m) + + +def compute_minhash_for_file( + infile: str, + output_dir: str, + num_perm: int, + num_cpus: int = 32, + progress: bool = True, +): + """ + Compute minhash signatures for a given jsonl file with the format specified for + 'compute_minhash_jsonl' above. + + infile is the path to the singular jsonl file + will store the minhash signatures in self.output_dir + """ + n = 50000 + fname = str(infile).split("/")[-1] + + if progress: + pbar = tqdm(total=n, desc=fname) + else: + pbar = None + + with open(infile) as fin, Pool(num_cpus) as p: + minhash_list = list() + partial_compute_minhash = partial( + compute_minhash_jsonl, fname=fname, num_perm=num_perm + ) + for result in p.imap_unordered(partial_compute_minhash, enumerate(fin)): + if result: + minhash_list.append(result) + if pbar: + pbar.update() + with open(f"{output_dir}/{fname[:-6]}.pkl", "wb") as fp: + pickle.dump(minhash_list, fp) + print(f"Generated MinHash for {len(minhash_list):,} documents in {fname}") + class MinHasher: - """ - Handles computing minhash signatures using datasketch - - Example usage: - ``` - indir = "/data/jsonl_data/" - outdir = "/data/minhashes/" - m = MinHasher(indir, outdir) - m.process() # signatures will be stored in outdir - ``` - """ - def __init__(self, jsonl_dir: str, output_dir: str, num_perm: int = 128): - """ - jsonl_dir: path to jsonl files for the given corpus - output_dir: path to save minhash signatures to for the given corpus - """ - self.input_dir = jsonl_dir - self.output_dir = output_dir - self.num_perm = num_perm - - os.makedirs(self.output_dir, exist_ok=True) - - - def process(self): - """ - Compute minhash signatures for a directory of jsonl files with the format specified for - 'self.compute_minhash_jsonl'. - """ - for infile in glob(f"{self.input_dir}/*.jsonl"): - self.compute_minhash_for_file(infile) - - def compute_minhash_jsonl(self, t: tuple, fname: str) -> Optional[tuple]: - """ - This allows us to ingest text data and compute minhash signatures from jsonl files. - Each json object may have arbitrary metadata but should store relevant text data for training using the - 'text' key. For example, a valid object might look like: - - { - title: 'My Article', - meta: {pub_date: ...}, - text: {'Some text for training...'} - } - """ - return compute_minhash_jsonl(t, fname, self.num_perm) - - def compute_minhash_for_file(self, infile: str): - """ - Compute minhash signatures for a given jsonl file with the format specified for - 'compute_minhash_jsonl' above. - - infile is the path to the singular jsonl file - will store the minhash signatures in self.output_dir - """ - compute_minhash_for_file(infile, self.output_dir, self.num_perm) + """ + Handles computing minhash signatures using datasketch + + Example usage: + ``` + indir = "/data/jsonl_data/" + outdir = "/data/minhashes/" + m = MinHasher(indir, outdir) + m.process() # signatures will be stored in outdir + ``` + """ + + def __init__( + self, jsonl_dir: str, output_dir: str, num_perm: int = 128, num_cpus: int = 32 + ): + """ + jsonl_dir: path to jsonl files for the given corpus + output_dir: path to save minhash signatures to for the given corpus + """ + self.input_dir = jsonl_dir + self.output_dir = output_dir + self.num_perm = num_perm + self.num_cpus = num_cpus + + os.makedirs(self.output_dir, exist_ok=True) + + def process(self): + """ + Compute minhash signatures for a directory of jsonl files with the format specified for + 'self.compute_minhash_jsonl'. + """ + for infile in glob(f"{self.input_dir}/*.jsonl"): + self.compute_minhash_for_file(infile) + + def compute_minhash_jsonl(self, t: tuple, fname: str) -> Optional[tuple]: + """ + This allows us to ingest text data and compute minhash signatures from jsonl files. + Each json object may have arbitrary metadata but should store relevant text data for training using the + 'text' key. For example, a valid object might look like: + + { + title: 'My Article', + meta: {pub_date: ...}, + text: {'Some text for training...'} + } + """ + return compute_minhash_jsonl(t, fname, self.num_perm) + + def compute_minhash_for_file(self, infile: str): + """ + Compute minhash signatures for a given jsonl file with the format specified for + 'compute_minhash_jsonl' above. + infile is the path to the singular jsonl file + will store the minhash signatures in self.output_dir + """ + compute_minhash_for_file(infile, self.output_dir, self.num_perm, self.num_cpus) diff --git a/deduplication/parsl_conf.py b/deduplication/parsl_conf.py new file mode 100644 index 0000000..18fd773 --- /dev/null +++ b/deduplication/parsl_conf.py @@ -0,0 +1,290 @@ +"""Utilities to build Parsl configurations.""" + +from __future__ import annotations + +import json +from abc import ABC, abstractmethod +from argparse import Namespace +from pathlib import Path + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal # type: ignore [assignment] + +from typing import ParamSpec, Sequence, TypeVar, Union + +import yaml +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import MpiExecLauncher +from parsl.providers import LocalProvider, PBSProProvider +from parsl.utils import get_all_checkpoints +from pydantic import BaseModel as _BaseModel +from pydantic import Field + +T = TypeVar("T") +P = ParamSpec("P") + +PathLike = Union[str, Path] + + +class BaseModel(_BaseModel): + """An interface to add JSON/YAML serialization to Pydantic models.""" + + def write_json(self, path: PathLike) -> None: + """Write the model to a JSON file. + + Parameters + ---------- + path : str + The path to the JSON file. + """ + with open(path, "w") as fp: + json.dump(self.dict(), fp, indent=2) + + @classmethod + def from_json(cls: type[T], path: PathLike) -> T: + """Load the model from a JSON file. + + Parameters + ---------- + path : str + The path to the JSON file. + + Returns + ------- + T + A specific BaseModel instance. + """ + with open(path) as fp: + data = json.load(fp) + return cls(**data) + + def write_yaml(self, path: PathLike) -> None: + """Write the model to a YAML file. + + Parameters + ---------- + path : str + The path to the YAML file. + """ + with open(path, mode="w") as fp: + yaml.dump(json.loads(self.json()), fp, indent=4, sort_keys=False) + + @classmethod + def from_yaml(cls: type[T], path: PathLike) -> T: + """Load the model from a YAML file. + + Parameters + ---------- + path : PathLike + The path to the YAML file. + + Returns + ------- + T + A specific BaseModel instance. + """ + with open(path) as fp: + raw_data = yaml.safe_load(fp) + return cls(**raw_data) + + +class BaseComputeSettings(BaseModel, ABC): + """Compute settings (HPC platform, number of GPUs, etc).""" + + parsl_config: Literal[""] = "" + """Name of the platform to use.""" + log_dir: PathLike = Field( + default=Path("./parsl"), description="Directory to store Parsl logs." + ) + """Path to store Parsl logs.""" + + @abstractmethod + def get_config(self) -> Config: + """Create a new Parsl configuration. + + Parameters + ---------- + run_dir : PathLike + Path to store parsl logs. + + Returns + ------- + Config + Parsl configuration. + """ + ... + + +class LocalSettings(BaseComputeSettings): + """Settings for a local machine (mainly for testing purposes).""" + + parsl_config: Literal["local"] = "local" # type: ignore[assignment] + max_workers: int = 1 + cores_per_worker: float = 0.0001 + worker_port_range: tuple[int, int] = (10000, 20000) + label: str = "htex" + + def get_config(self, run_dir: PathLike) -> Config: + """Create a parsl configuration for testing locally.""" + return Config( + run_dir=str(self.log_dir), + strategy=None, + executors=[ + HighThroughputExecutor( + address="localhost", + label=self.label, + max_workers=self.max_workers, + cores_per_worker=self.cores_per_worker, + worker_port_range=self.worker_port_range, + provider=LocalProvider(init_blocks=1, max_blocks=1), + ), + ], + ) + + +class WorkstationSettings(BaseComputeSettings): + """Settings for a workstation with GPUs.""" + + parsl_config: Literal["workstation"] = "workstation" # type: ignore[assignment] + """Name of the platform.""" + available_accelerators: Union[int, Sequence[str]] = 8 # noqa: UP007 + """Number of GPU accelerators to use.""" + worker_port_range: tuple[int, int] = (10000, 20000) + """Port range.""" + retries: int = 1 + label: str = "htex" + + def get_config(self) -> Config: + """Create a parsl configuration for running on a workstation.""" + return Config( + run_dir=str(self.log_dir), + retries=self.retries, + executors=[ + HighThroughputExecutor( + address="localhost", + label=self.label, + cpu_affinity="block", + available_accelerators=self.available_accelerators, + worker_port_range=self.worker_port_range, + provider=LocalProvider(init_blocks=1, max_blocks=1), + ), + ], + ) + + +class PolarisSettings(BaseComputeSettings): + """Polaris@ALCF settings. + + See here for details: https://docs.alcf.anl.gov/polaris/workflows/parsl/ + """ + + parsl_config: Literal["polaris"] = "polaris" # type: ignore[assignment] + label: str = "htex" + + num_nodes: int = 1 + """Number of nodes to request""" + worker_init: str = "" + """How to start a worker. Should load any modules and environments.""" + scheduler_options: str = "#PBS -l filesystems=home:eagle:grand" + """PBS directives, pass -J for array jobs.""" + account: str + """The account to charge compute to.""" + queue: str + """Which queue to submit jobs to, will usually be prod.""" + walltime: str + """Maximum job time.""" + cpus_per_node: int = 64 + """This must correspond to the 'depth' argument for MPI.""" + cores_per_worker: int = 4 + """Number of cores per worker.""" + retries: int = 0 + """Number of retries upon failure.""" + worker_debug: bool = False + """Enable worker debug.""" + + def get_config(self) -> Config: + """Create a parsl configuration for running on Polaris@ALCF. + + We will launch 4 workers per node, each pinned to a different GPU. + + Parameters + ---------- + run_dir: PathLike + Directory in which to store Parsl run files. + """ + run_dir = str(self.log_dir) + checkpoints = get_all_checkpoints(run_dir) + + config = Config( + executors=[ + HighThroughputExecutor( + label=self.label, + heartbeat_period=15, + heartbeat_threshold=120, + worker_debug=self.worker_debug, + cores_per_worker=self.cores_per_worker, + cpu_affinity="block-reverse", + prefetch_capacity=0, + provider=PBSProProvider( + launcher=MpiExecLauncher( + bind_cmd="--cpu-bind", + overrides="--depth=64 --ppn 1", + ), + account=self.account, + queue=self.queue, + # select_options='ngpus=4', # commented out + # PBS directives: for array jobs pass '-J' option + scheduler_options=self.scheduler_options, + # Command to be run before starting a worker, such as: + worker_init=self.worker_init, + # number of compute nodes allocated for each block + nodes_per_block=self.num_nodes, + init_blocks=1, + min_blocks=0, + max_blocks=1, # Increase to have more parallel jobs + cpus_per_node=self.cpus_per_node, + walltime=self.walltime, + ), + ), + ], + checkpoint_files=checkpoints, + run_dir=run_dir, + checkpoint_mode="task_exit", + retries=self.retries, + app_cache=True, + ) + + return config + + +ComputeSettingsTypes = Union[ + LocalSettings, + WorkstationSettings, + PolarisSettings, +] + + +def get_compute_settings(args: Namespace) -> ComputeSettingsTypes: + """Instantiate the appropriate ComputeSettings class based on provider type.""" + if args.parsl_config == "local": + return LocalSettings( + max_workers=1, # todo: parameterize + cores_per_worker=args.cores_per_worker, + ) + elif args.parsl_config == "workstation": + return WorkstationSettings( + cores_per_worker=args.cores_per_worker, + retries=args.retries, + ) + elif args.parsl_config == "polaris": + config_dict = { + k: v + for k, v in vars(args).items() + if k in PolarisSettings.model_fields and v is not None + } + return PolarisSettings(**config_dict) + else: + raise ValueError(f"Unknown config type: {args.parsl_config}") diff --git a/deduplication/workflows.py b/deduplication/workflows.py index f0e4be9..1be96f2 100644 --- a/deduplication/workflows.py +++ b/deduplication/workflows.py @@ -1,12 +1,21 @@ -from deduplication.minhash import MinHasher +import os +from functools import partial +from pathlib import Path +from typing import List + +from parsl.concurrent import ParslPoolExecutor + from deduplication.lsh import LSHIndex -from deduplication.lshbloom import LSHBloom +from deduplication.lshbloom import LSHBloom, run_LSHBloom +from deduplication.minhash import MinHasher from deduplication.writers import write_duplicates_to_csv -from typing import List -import os + +from .minhash import compute_minhash_for_file +from .parsl_conf import ComputeSettingsTypes # <<< MinHashLSH >>> + # workflow for deduping single corpus against the LSH Index def dedup_single_lsh( input_dir: str, @@ -35,7 +44,9 @@ def dedup_single_lsh( index = LSHIndex(minhash_dir, lsh_params) duplicates = index.deduplicate_corpus() - write_duplicates_to_csv(duplicates, csvfile, corpus_name, header=["corpus", "key", "dup_key"]) + write_duplicates_to_csv( + duplicates, csvfile, corpus_name, header=["corpus", "key", "dup_key"] + ) # workflow for deduping many corpora at once @@ -50,8 +61,9 @@ def dedup_multi_lsh( redis_port: int = 6379, compute_minhashes: bool = True, ): - assert len(input_dirs) == len(minhash_dirs) == len(corpus_names), \ - f"Expected len(input_dirs) == len(minhash_dirs) == len(corpus_names), got {len(input_dirs)}, {len(minhash_dirs)}, {len(corpus_names)}" + assert ( + len(input_dirs) == len(minhash_dirs) == len(corpus_names) + ), f"Expected len(input_dirs) == len(minhash_dirs) == len(corpus_names), got {len(input_dirs)}, {len(minhash_dirs)}, {len(corpus_names)}" for i in range(len(input_dirs)): dedup_single_lsh( @@ -101,9 +113,12 @@ def dedup_single_file_lsh( # <<< LSHBloom >>> + def clear_dir(save_dir): if os.path.exists(save_dir): - rm_files = [os.path.join(save_dir, f) for f in save_dir if ".bf" in f or '.csv' in f] + rm_files = [ + os.path.join(save_dir, f) for f in save_dir if ".bf" in f or ".csv" in f + ] for f in rm_files: os.remove(f) @@ -121,25 +136,67 @@ def dedup_single_bloom( save_dir: str = "./", compute_minhashes: bool = True, clear: bool = False, + parsl_config: ComputeSettingsTypes = None, + num_cpus: int = 32, ): if clear: clear_dir(save_dir) - + lsh_params = { "threshold": sim_threshold, "num_perm": n_hash_funcs, "n": corpus_size, "fp": false_positive_rate, - "save_dir": save_dir + "save_dir": save_dir, } - if compute_minhashes: - m = MinHasher(input_dir, minhash_dir, n_hash_funcs) - m.process() + if parsl_config is None: + # Compute minhashes if needed + if compute_minhashes: + m = MinHasher(input_dir, minhash_dir, n_hash_funcs, num_cpus) + m.process() - index = LSHBloom(minhash_dir, lsh_params) - duplicates = index.deduplicate_corpus() - write_duplicates_to_csv(duplicates, csvfile, corpus_name, header=["dup_key"]) + # Deduplicate with Bloom Index + index = LSHBloom(minhash_dir, lsh_params) + duplicates = index.deduplicate_corpus() + write_duplicates_to_csv(duplicates, csvfile, corpus_name, header=["dup_key"]) + + return + + elif parsl_config is not None: + config = parsl_config.get_config() + + with ParslPoolExecutor(config) as executor: + print(f"Parsl config: {parsl_config}") + if compute_minhashes: + print("Using Parsl to compute minhashes") + + minhash_func = partial( + compute_minhash_for_file, + output_dir=minhash_dir, + num_perm=n_hash_funcs, + num_cpus=parsl_config.cores_per_worker, + progress=False, + ) + Path(minhash_dir).mkdir(parents=True, exist_ok=True) + jsonl_files = list(Path(input_dir).glob("*.jsonl")) + print(f"Found {len(jsonl_files)} jsonl files to process") + + # Compute minhashes in parallel + list(executor.map(minhash_func, jsonl_files)) + print("Finished computing minhashes for all jsonl files") + + print("Running LSH Bloom deduplication") + bloom_func = partial( + run_LSHBloom, + lsh_params=lsh_params, + csvfile=csvfile, + corpus_name=corpus_name, + ) + executor.submit(bloom_func, minhash_dir).result() + print(f"Written duplicates to: {csvfile}") + + return # workflow for deduping many corpora against the Bloom Index at once @@ -155,27 +212,65 @@ def dedup_multi_bloom( save_dir: str = "./", compute_minhashes: bool = True, clear: bool = False, + parsl_config: ComputeSettingsTypes = None, ): - assert len(input_dirs) == len(minhash_dirs) == len(corpus_names), \ - f"Expected len(input_dirs) == len(minhash_dirs) == len(corpus_names), got {len(input_dirs)}, {len(minhash_dirs)}, {len(corpus_names)}" + assert ( + len(input_dirs) == len(minhash_dirs) == len(corpus_names) + ), f"Expected len(input_dirs) == len(minhash_dirs) == len(corpus_names), got {len(input_dirs)}, {len(minhash_dirs)}, {len(corpus_names)}" if clear: clear_dir(save_dir) - for i in range(len(input_dirs)): - dedup_single_bloom( - input_dirs[i], - minhash_dirs[i], - corpus_size, - false_positive_rate, - csvfile, - corpus_names[i], - sim_threshold, - n_hash_funcs, - save_dir, - compute_minhashes, - clear=False + if parsl_config is None: + # Compute in serial + for i in range(len(input_dirs)): + dedup_single_bloom( + input_dirs[i], + minhash_dirs[i], + corpus_size, + false_positive_rate, + csvfile, + corpus_names[i], + sim_threshold, + n_hash_funcs, + save_dir, + compute_minhashes, + clear=False, + ) + + elif parsl_config is not None: + # Need this function to alter the order of arguments to match + # what changes between each call + def dedup_single_wrapper(input_dir, minhash_dir, corpus_name, **kwargs): + # Call the original function with the correct mapping of arguments + dedup_single_bloom( + input_dir=input_dir, + minhash_dir=minhash_dir, + corpus_name=corpus_name, + **kwargs, # Pass the remaining fixed arguments + ) + + # Setup parsl config + config = parsl_config.get_config() + print(f"Parsl config: {parsl_config}") + + process_func = partial( + dedup_single_wrapper, + corpus_size=corpus_size, + false_positive_rate=false_positive_rate, + csvfile=csvfile, + sim_threshold=sim_threshold, + n_hash_funcs=n_hash_funcs, + save_dir=save_dir, + compute_minhashes=compute_minhashes, + clear=False, + parsl_config=None, # We don't want to use parsl within parsl + num_cpus=parsl_config.cores_per_worker, ) + print("Running LSH Bloom deduplication") + with ParslPoolExecutor(config) as executor: + list(executor.map(process_func, input_dirs, minhash_dirs, corpus_names)) + def dedup_single_file_bloom( input_file: str, @@ -189,6 +284,7 @@ def dedup_single_file_bloom( save_dir: str = "./", compute_minhashes: bool = True, clear: bool = False, + parsl_config: dict = None, ): if clear: clear_dir(save_dir) @@ -198,7 +294,7 @@ def dedup_single_file_bloom( "num_perm": n_hash_funcs, "n": corpus_size, "fp": false_positive_rate, - "save_dir": save_dir + "save_dir": save_dir, } if compute_minhashes: