From 8c8905a7e32b2b480f976e05cfd9e74b36adbdbb Mon Sep 17 00:00:00 2001 From: dataxight Date: Sat, 9 Aug 2025 00:01:33 +0700 Subject: [PATCH 1/2] fix vep, securities, log-level, and able to work with gzip files --- .gitignore | 3 + README.md | 18 +- docker/batch_runner.sh | 4 +- docs/advanced_flags.md | 9 +- docs/bigquery_to_vcf.md | 9 +- docs/development_guide.md | 6 +- docs/sample_queries/README.md | 1 + docs/setting_region.md | 34 +- docs/variant_annotation.md | 11 +- docs/vcf_files_preprocessor.md | 6 +- gcp_variant_transforms/beam_io/bgzf_test.py | 3 + gcp_variant_transforms/beam_io/vcf_parser.py | 9 +- gcp_variant_transforms/beam_io/vcfio.py | 1 - gcp_variant_transforms/beam_io/vcfio_test.py | 37 +- gcp_variant_transforms/bq_to_vcf.py | 6 +- gcp_variant_transforms/helper.py | 23 + .../libs/annotation/vep/vep_runner.py | 124 +++- gcp_variant_transforms/libs/bigquery_util.py | 2 +- .../libs/partitioning_test.py | 10 +- .../libs/variant_sharding.py | 4 +- .../options/variant_transform_options.py | 19 +- gcp_variant_transforms/pipeline_common.py | 2 +- .../testing/bigquery_validator.py | 635 ++++++++++++++++++ .../testing/integration/run_tests_common.py | 4 +- .../testing/vcf_genotype_sampler.py | 342 ++++++++++ gcp_variant_transforms/vcf_to_bq.py | 105 ++- .../vcf_to_bq_preprocess.py | 3 + requirements.txt | 3 +- setup.py | 21 +- 29 files changed, 1318 insertions(+), 136 deletions(-) create mode 100644 gcp_variant_transforms/helper.py create mode 100755 gcp_variant_transforms/testing/bigquery_validator.py create mode 100755 gcp_variant_transforms/testing/vcf_genotype_sampler.py diff --git a/.gitignore b/.gitignore index 0287700f8..dd7d7c169 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,6 @@ venv3/ .vscode/ .coverage *.DS_Store + +# Secrets +sa_token.json diff --git a/README.md b/README.md index 44fe5b2fa..887a10d0b 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This is a tool for transforming and processing [VCF](https://samtools.github.io/hts-specs/VCFv4.3.pdf) files in a scalable -manner based on [Apache Beam](https://beam.apache.org/) using +manner based on [Apache Beam](https://beam.apache.org/) using [Dataflow](https://cloud.google.com/dataflow/) on Google Cloud Platform. It can be used to directly load VCF files to @@ -45,6 +45,7 @@ running `gcloud components update` (more details [here](https://cloud.google.com Use the following command to get the latest version of Variant Transforms. ```bash +# TODO: The Docker image must be rebuilt and hosted elsewhere docker pull gcr.io/cloud-lifesciences/gcp-variant-transforms ``` @@ -56,11 +57,8 @@ Run the script below and replace the following parameters: * `GOOGLE_CLOUD_REGION`: You must choose a geographic region for Cloud Dataflow to process your data, for example: `us-west1`. For more information please refer to [Setting Regions](docs/setting_region.md). - * `GOOGLE_CLOUD_LOCATION`: You may choose a geographic location for Cloud Life - Sciences API to orchestrate job from. This is not where the data will be processed, - but where some operation metadata will be stored. This can be the same or different from - the region chosen for Cloud Dataflow. If this is not set, the metadata will be stored in - us-central1. See the list of [Currently Available Locations](https://cloud.google.com/life-sciences/docs/concepts/locations). + * `GOOGLE_CLOUD_LOCATION`: You may choose a geographic location for Cloud Batch to orchestrate job from. This is not where the data will be processed, + but where some operation metadata will be stored. This can be the same or different from the region chosen for Cloud Dataflow. If this is not set, it will use the default value you have configured for `batch/location` in your gcloud CLI (You can see how to set the default location [here](./setting_region.md/#running-jobs-in-a-particular-region)). See the list of [Currently Available Locations](https://cloud.google.com/batch/docs/locations). * `TEMP_LOCATION`: This can be any folder in Google Cloud Storage that your project has write access to. It's used to store temporary files and logs from the pipeline. @@ -89,6 +87,7 @@ COMMAND="vcf_to_bq \ --job_name vcf-to-bigquery \ --runner DataflowRunner" +# TODO: The Docker image must be rebuilt and hosted elsewhere docker run -v ~/.config:/root/.config \ gcr.io/cloud-lifesciences/gcp-variant-transforms \ --project "${GOOGLE_CLOUD_PROJECT}" \ @@ -114,10 +113,10 @@ In addition to using the docker image, you may run the pipeline directly from source. First install git, python, pip, and virtualenv: ```bash -sudo apt-get install -y git python3-pip python3-venv python3.7-venv python-dev build-essential +sudo apt-get install -y git python3-pip python3-venv python3.12-venv python-dev build-essential ``` -Note that python 3.8 is not yet supported, so ensure you are using Python 3.7. +We encourage you to use Python 3.12, as we have upgraded and fully adapted the project to work well with this version. Run virtualenv, clone the repo, and install pip packages: @@ -180,8 +179,6 @@ details. ## Additional topics -* [Understanding the BigQuery Variants Table - Schema](https://cloud.google.com/life-sciences/docs/how-tos/bigquery-variants-schema) * [Loading multiple files](docs/multiple_files.md) * [Variant merging](docs/variant_merging.md) * [Handling large inputs](docs/large_inputs.md) @@ -195,4 +192,3 @@ details. * [Development Guide](docs/development_guide.md) * [Release process](docs/release.md) - diff --git a/docker/batch_runner.sh b/docker/batch_runner.sh index d98184220..06966ee17 100755 --- a/docker/batch_runner.sh +++ b/docker/batch_runner.sh @@ -79,10 +79,10 @@ function main { # If missing, we will try to find the default values. google_cloud_project="${google_cloud_project:-$(gcloud config get-value project)}" region="${region:-$(gcloud config get-value compute/region)}" + location="${location:-$(gcloud config get-value batch/location)}" vt_docker_image="${vt_docker_image:-us-east1-docker.pkg.dev/variant-transform-dxt/dxt-public-variant-transform/batch-runner:latest}" sdk_container_image="${sdk_container_image:-}" - location="${location:-}" temp_location="${temp_location:-}" subnetwork="${subnetwork:-}" use_public_ips="${use_public_ips:-}" @@ -113,7 +113,7 @@ function main { # Build Dataflow required args based on `docker run ...` inputs. df_required_args="--project ${google_cloud_project} --region ${region} --temp_location ${temp_location}" - # Build up optional args for pipelines-tools and Dataflow, if they are provided. + # Build up optional args for Batch Job and Dataflow, if they are provided. pt_optional_args="" df_optional_args="" diff --git a/docs/advanced_flags.md b/docs/advanced_flags.md index 270bddd3c..8bfb4a3b3 100644 --- a/docs/advanced_flags.md +++ b/docs/advanced_flags.md @@ -8,6 +8,7 @@ Specify a subnetwork by using the `--subnetwork` flag and provide the name of th Example: ```bash +# TODO: The Docker image must be rebuilt and hosted elsewhere COMMAND="/opt/gcp_variant_transforms/bin/vcf_to_bq ... docker run gcr.io/cloud-lifesciences/gcp-variant-transforms \ @@ -29,6 +30,7 @@ on the subnet. Example: ```bash +# TODO: The Docker image must be rebuilt and hosted elsewhere COMMAND="/opt/gcp_variant_transforms/bin/vcf_to_bq ... docker run gcr.io/cloud-lifesciences/gcp-variant-transforms \ @@ -39,12 +41,14 @@ docker run gcr.io/cloud-lifesciences/gcp-variant-transforms \ ``` ## Custom Dataflow Runner Image + By default Variant Transforms uses a custom docker image to run the pipeline in: `gcr.io/cloud-lifesciences/variant-transforms-custom-runner:latest`. This image contains all the necessary python/linux dependencies needed to run variant transforms so that they are not downloaded from the internet when the pipeline starts. You can override which container is used by passing a `--sdk_container_image` as in the following example: ```bash +# TODO: The Docker image must be rebuilt and hosted elsewhere COMMAND="/opt/gcp_variant_transforms/bin/vcf_to_bq ... docker run gcr.io/cloud-lifesciences/gcp-variant-transforms \ @@ -58,6 +62,7 @@ docker run gcr.io/cloud-lifesciences/gcp-variant-transforms \ By default the dataflow workers will use the [default compute service account](https://cloud.google.com/compute/docs/access/service-accounts#default_service_account). You can override which service account to use with the `--service_account` flag as in the following example: ```bash +# TODO: The Docker image must be rebuilt and hosted elsewhere COMMAND="/opt/gcp_variant_transforms/bin/vcf_to_bq ... docker run gcr.io/cloud-lifesciences/gcp-variant-transforms \ @@ -68,5 +73,5 @@ docker run gcr.io/cloud-lifesciences/gcp-variant-transforms \ ``` **Other Service Account Notes:** -- The [Life Sciences Service Account is not changable](https://cloud.google.com/life-sciences/docs/troubleshooting#missing_service_account) -- The [Dataflow Admin Service Account is not changable](https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#service_account) \ No newline at end of file +- [Control access for a job using a custom service account](https://cloud.google.com/batch/docs/create-run-job-custom-service-account) +- The [Dataflow Admin Service Account is not changable](https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#service_account) diff --git a/docs/bigquery_to_vcf.md b/docs/bigquery_to_vcf.md index 0fb460e68..56bd6bc06 100644 --- a/docs/bigquery_to_vcf.md +++ b/docs/bigquery_to_vcf.md @@ -21,11 +21,8 @@ Run the script below and replace the following parameters: * `GOOGLE_CLOUD_REGION`: You must choose a geographic region for Cloud Dataflow to process your data, for example: `us-west1`. For more information please refer to [Setting Regions](docs/setting_region.md). - * `GOOGLE_CLOUD_LOCATION`: You may choose a geographic location for Cloud Life - Sciences API to orchestrate job from. This is not where the data will be processed, - but where some operation metadata will be stored. This can be the same or different from - the region chosen for Cloud Dataflow. If this is not set, the metadata will be stored in - us-central1. See the list of [Currently Available Locations](https://cloud.google.com/life-sciences/docs/concepts/locations). + * `GOOGLE_CLOUD_LOCATION`: You may choose a geographic location for Cloud Batch to orchestrate job from. This is not where the data will be processed, but where some operation metadata will be stored. This can be the same or different from + the region chosen for Cloud Dataflow. If this is not set, it will use the default value you have configured for `batch/location` in your gcloud CLI (You can see how to set the default location [here](./setting_region.md/#running-jobs-in-a-particular-region)). See the list of [Currently Available Locations](https://cloud.google.com/batch/docs/locations). * `TEMP_LOCATION`: This can be any folder in Google Cloud Storage that your project has write access to. It's used to store temporary files and logs from the pipeline. @@ -50,7 +47,7 @@ COMMAND="bq_to_vcf \ --output_file ${OUTPUT_FILE} \ --job_name bq-to-vcf \ --runner DataflowRunner" - +# TODO: The Docker image must be rebuilt and hosted elsewhere docker run -v ~/.config:/root/.config \ gcr.io/cloud-lifesciences/gcp-variant-transforms \ --project "${GOOGLE_CLOUD_PROJECT}" \ diff --git a/docs/development_guide.md b/docs/development_guide.md index b7ee1723e..2627e9105 100644 --- a/docs/development_guide.md +++ b/docs/development_guide.md @@ -41,10 +41,10 @@ git remote add upstream git@github.com:googlegenomics/gcp-variant-transforms.git #### Setup virtualenv -Ensure you are using Python 3.7 version, since Apache Beam does not support 3.8. +We encourage you to use Python 3.12, as we have upgraded and fully adapted the project to work well with this version. ```bash -sudo apt-get install python3-pip python3-venv python3.7-venv python-dev build-essential +sudo apt-get install python3-pip python3-venv python3.12-venv python-dev build-essential python3 -m venv venv3 . venv3/bin/activate ``` @@ -102,7 +102,7 @@ checked into the git repository and can be imported into File | Settings | Editor | Inspections. Code inspections can be run from the Analyze menu. To speed up the inspection -process, you can go to File | Project Structure | Modules and only set the +process, you can go to File | Project Structure | Modules and only set the gcp_variant_transforms as the Sources. You may exclude other folders, or specify the inspection scope to be only Module 'gcp-variant-transforms' when running the inspection. The result window can be accessed from View > Tool Windows. diff --git a/docs/sample_queries/README.md b/docs/sample_queries/README.md index c84f4826f..10843fd12 100644 --- a/docs/sample_queries/README.md +++ b/docs/sample_queries/README.md @@ -10,6 +10,7 @@ users to send us their queries so we can share them here with all other researchers. Please feel free to [submit an issue](https://github.com/googlegenomics/gcp-variant-transforms/issues) or contact us via our public mailing list [gcp-life-sciences-discuss@googlegroups.com](mailto:gcp-life-sciences-discuss@googlegroups.com). + ## Genome Aggregation Database (gnomAD) diff --git a/docs/setting_region.md b/docs/setting_region.md index dc95574ef..1bb72680e 100644 --- a/docs/setting_region.md +++ b/docs/setting_region.md @@ -13,7 +13,7 @@ are located in the same region: * Your pipeline's temporary location set by `--temp_location` flag. * Your output BigQuery dataset set by `--output_table` flag. * Your Dataflow pipeline set by `--region` flag. -* Your Life Sciences API location set by `--location` flag. +* Your Cloud Batch location set by `--location` flag. ## Running jobs in a particular region The Dataflow API [requires](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#configuring-pipelineoptions-for-execution-on-the-cloud-dataflow-service) @@ -21,23 +21,18 @@ setting a [GCP region](https://cloud.google.com/compute/docs/regions-zones/#available) via `--region` flag to run. -When running from Docker, the Cloud Life Sciences API is used to spin up a -worker that launches and monitors the Dataflow job. Cloud Life Sciences API -is a [regionalized service](https://cloud.google.com/life-sciences/docs/concepts/locations) -that runs in multiple regions. This is set with the `--location` flag. The -Life Sciences API location is where metadata about the pipeline's progress -will be stored, and can be different from the region where the data is -processed. Note that Cloud Life Sciences API is not available in all regions, -and if this flag is left out, the metadata will be stored in us-central1. See -the list of [Currently Available Locations](https://cloud.google.com/life-sciences/docs/concepts/locations). - -In addition to this requirment you might also -choose to run Variant Transforms in a specific region following your project’s -security and compliance requirements. For example, in order -to restrict your processing job to europe-west4 (Netherlands), set the region -and location as follows: +When running from Docker, Cloud Batch is used to provision a worker VM that runs a user-defined task, such as launching a Dataflow job. Cloud Batch is a regionalized service that runs jobs in specific regions. This is set using the `--location` flag. The location determines where the Batch job will be executed, and where metadata about the job will be stored. Note that Cloud Batch is not available in all regions, and the `--location` flag is required. If the `--location` flag is not set explicitly, the job submission will fail unless you have already configured a default location in your gcloud CLI. In that case, Cloud Batch will use the default location. You can set this default location with the following command: ```bash +gcloud config set batch/location YOUR_DEFAULT_LOCATION +``` + +See the list of [Currently Available Locations](https://cloud.google.com/batch/docs/locations). + +In addition to this requirment you might also choose to run Variant Transforms in a specific region following your project’s security and compliance requirements. For example, in order to restrict your processing job to europe-west4 (Netherlands), set the region and location as follows: + +```bash +# TODO: The Docker image must be rebuilt and hosted elsewhere COMMAND="/opt/gcp_variant_transforms/bin/vcf_to_bq ... docker run gcr.io/cloud-lifesciences/gcp-variant-transforms \ @@ -49,7 +44,7 @@ docker run gcr.io/cloud-lifesciences/gcp-variant-transforms \ ``` Note that values of `--project`, `--region`, and `--temp_location` flags will be automatically -passed as `COMMAND` inputs in [`piplines_runner.sh`](docker/pipelines_runner.sh). +passed as `COMMAND` inputs in [`batch_runner.sh`](docker/batch_runner.sh). Instead of setting `--region` flag for each run, you can set your default region using the following command. In that case, you will not need to set the `--region` @@ -83,12 +78,11 @@ when you are [creating it](https://cloud.google.com/storage/docs/creating-bucket When you create a bucket, you [permanently define](https://cloud.google.com/storage/docs/moving-buckets#storage-create-bucket-console) its name, its geographic location, and the project it is part of. For an existing bucket, you can check -[its information](https://cloud.google.com/storage/docs/getting-bucket-information) to find out +[its information](https://cloud.google.com/storage/docs/getting-bucket-information) to find out about its geographic location. -## Setting BigQuery dataset region +## Setting BigQuery dataset region You can choose the region for the BigQuery dataset at dataset creation time. ![BigQuery dataset region](images/bigquery_dataset_region.png) - diff --git a/docs/variant_annotation.md b/docs/variant_annotation.md index c952943a8..4c0eac9f5 100644 --- a/docs/variant_annotation.md +++ b/docs/variant_annotation.md @@ -78,9 +78,9 @@ minimum number of flags to enable this feature is `--run_annotation_pipeline` and `--annotation_output_dir [GCS_PATH]` where `[GCS_PATH]` is a path in a GCS bucket that your project owns. -Variant annotation will start a separate Cloud Life Sciences pipeline to run -the vep_runner. You can provide `--location` to specify the location to use -for Cloud Life Sciences API. If not provided, it will default to `us-central1`. +Variant annotation will start separate multiple Batch jobs to run +the vep_runner, the number of Batch jobs depend on both the size of your input files and the value you set for [`--number_of_runnables_per_job`](#details). You can provide `--location` to specify the location to use +for Cloud Batch. If not provided, it will default to `us-central1`. The compute region will come from the `--region` flag passed from docker. @@ -106,6 +106,8 @@ followed by `_vep_output.vcf`. Note that if this directory already exists, then Variant Transforms fails. This is to prevent unintentional overwriting of old annotated VCFs. +* `--number_of_runnables_per_job` The maximum number of runnables (e.g. VEP jobs) to create per job (default: 95). The batch system only supports a maximum of 100 runnables per job, so this flag cannot be set higher than 95. This ensures that there are always 5 runnables reserved for system cycles. For larger input files, it is recommended to set a smaller value for the this flag to achieve faster processing speed + * [`--shard_variants`](https://github.com/googlegenomics/gcp-variant-transforms/blob/master/gcp_variant_transforms/options/variant_transform_options.py#L290) by default, the input files are sharded into smaller temporary VCF files before running VEP annotation. If the input files are small, i.e., each VCF file @@ -118,12 +120,14 @@ true. The default value should work for most cases. You may change this flag to a smaller value if you have a dataset with a lot of samples. Notice that pipeline may take longer to finish for smaller value of this flag. + * [`--vep_image_uri`](https://github.com/googlegenomics/gcp-variant-transforms/blob/c4659bba2cf577d64f15db5cd9f477d9ea2b51b0/gcp_variant_transforms/options/variant_transform_options.py#L196) the docker image for VEP created using the [Dockerfile in variant-annotation](https://github.com/googlegenomics/variant-annotation/tree/master/batch/vep) GitHub repo. By default `gcr.io/cloud-lifesciences/vep:104` is used which is a public image that Google maintains (VEP version 104). + * [`--vep_cache_path`](https://github.com/googlegenomics/gcp-variant-transforms/blob/c4659bba2cf577d64f15db5cd9f477d9ea2b51b0/gcp_variant_transforms/options/variant_transform_options.py#L200) the GCS location that has the compressed version of VEP cache. This file can be created using @@ -227,4 +231,3 @@ public databases are also made available in BigQuery including ([table](https://bigquery.cloud.google.com/table/isb-cgc:genome_reference.Ensembl2Reactome?tab=details)) and [WikiPathways](https://www.wikipathways.org) ([table](https://bigquery.cloud.google.com/table/isb-cgc:QotM.WikiPathways_20170425_Annotated?tab=details)). - diff --git a/docs/vcf_files_preprocessor.md b/docs/vcf_files_preprocessor.md index 3a3d5c5eb..4d2fcf48e 100644 --- a/docs/vcf_files_preprocessor.md +++ b/docs/vcf_files_preprocessor.md @@ -46,11 +46,9 @@ Run the script below and replace the following parameters: * `GOOGLE_CLOUD_REGION`: You must choose a geographic region for Cloud Dataflow to process your data, for example: `us-west1`. For more information please refer to [Setting Regions](docs/setting_region.md). - * `GOOGLE_CLOUD_LOCATION`: You may choose a geographic location for Cloud Life - Sciences API to orchestrate job from. This is not where the data will be processed, + * `GOOGLE_CLOUD_LOCATION`: You may choose a geographic location for Cloud Batch to orchestrate job from. This is not where the data will be processed, but where some operation metadata will be stored. This can be the same or different from - the region chosen for Cloud Dataflow. If this is not set, the metadata will be stored in - us-central1. See the list of [Currently Available Locations](https://cloud.google.com/life-sciences/docs/concepts/locations). + the region chosen for Cloud Dataflow. If this is not set, it will use the default value you have configured for `batch/location` in your gcloud CLI (You can see how to set the default location [here](./setting_region.md/#running-jobs-in-a-particular-region)). See the list of [Currently Available Locations](https://cloud.google.com/batch/docs/locations). * `TEMP_LOCATION`: This can be any folder in Google Cloud Storage that your project has write access to. It's used to store temporary files and logs from the pipeline. diff --git a/gcp_variant_transforms/beam_io/bgzf_test.py b/gcp_variant_transforms/beam_io/bgzf_test.py index 958d19a7d..a09e27853 100644 --- a/gcp_variant_transforms/beam_io/bgzf_test.py +++ b/gcp_variant_transforms/beam_io/bgzf_test.py @@ -35,6 +35,9 @@ def setUp(self): self.gcs = gcsio.GcsIO(self.client) self._file_name = 'gs://bucket/test' bucket, name = gcsio.parse_gcs_path(self._file_name) + fake_bucket = self.client.create_bucket(bucket) + fake_blob = fake_bucket.blob(name) + fake_bucket.add_blob(fake_blob) self.client.add_file(bucket, name, self._data) def test_one_gzip_block(self): diff --git a/gcp_variant_transforms/beam_io/vcf_parser.py b/gcp_variant_transforms/beam_io/vcf_parser.py index 8b04c63d2..b7e857e53 100644 --- a/gcp_variant_transforms/beam_io/vcf_parser.py +++ b/gcp_variant_transforms/beam_io/vcf_parser.py @@ -838,12 +838,13 @@ def _get_variant(self, data_line): if self._vcf_reader is None: self._text_streamer.rewind() self._vcf_reader = libcbcf.VariantFile(self._text_streamer._temp_file.name, 'r') - record = next(iter(self._vcf_reader)) + record = next(self._vcf_reader) variant = self._convert_to_variant(record) return variant - except Exception as e: - print(f"Error parsing VCF line: {e}") - return None + except (ValueError, StopIteration, TypeError) as e: + logging.warning('VCF record read failed in %s for line %s: %s', + self._file_name, data_line, str(e)) + return MalformedVcfRecord(self._file_name, data_line, str(e)) def send_kill_signal_to_child(self): if self._vcf_reader is not None: diff --git a/gcp_variant_transforms/beam_io/vcfio.py b/gcp_variant_transforms/beam_io/vcfio.py index 334f6fde5..7bb5eac46 100644 --- a/gcp_variant_transforms/beam_io/vcfio.py +++ b/gcp_variant_transforms/beam_io/vcfio.py @@ -242,7 +242,6 @@ def read_records(self, move_hom_ref_calls=self._move_hom_ref_calls, buffer_size=self._buffer_size, skip_header_lines=0) - # Convert iterator to generator to abstract behavior for record in record_iterator: yield record diff --git a/gcp_variant_transforms/beam_io/vcfio_test.py b/gcp_variant_transforms/beam_io/vcfio_test.py index f30eb3d73..97ac20f88 100644 --- a/gcp_variant_transforms/beam_io/vcfio_test.py +++ b/gcp_variant_transforms/beam_io/vcfio_test.py @@ -41,12 +41,30 @@ from gcp_variant_transforms.testing.temp_dir import TempDir # Note: mixing \n and \r\n to verify both behaviors. +# WARN: We observed that if the header buffer size is smaller than a threshold, which +# we don't now exactly how much, the PySamParserWithFileStreaming will fail to read +# Real VCF files don't suffer this issue since because they always have big enough header _SAMPLE_HEADER_LINES = [ '##fileformat=VCFv4.2\n', - '##INFO=\n', + '##fileDate=20090805\n', + '##source=myImputationProgramV3.1\n', + '##reference=1000GenomesPilot-NCBI36\n', + '##phasing=partial\n', + '##FILTER=\n', + '##FILTER=\n', + '##FILTER=\n', + '##INFO=\n', + '##INFO=\n', '##INFO=\n', + '##INFO=\n', + '##INFO=\n', + '##INFO=\n', + '##INFO=\n', '##FORMAT=\r\n', '##FORMAT=\n', + '##FORMAT=\n', + '##FORMAT=\n', + '##FORMAT=\n', '#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tSample1\tSample2\r' '\n', ] @@ -56,10 +74,10 @@ '20\t17330\t.\tT\tA\t3\tq10\tAF=0.017\tGT:GQ\t0|0:49\t0|1:3\n', '20\t1110696\t.\tA\tG,T\t67\tPASS\tAF=0.3,0.7\tGT:GQ\t1|2:21\t2|1:2\n', '20\t1230237\t.\tT\t.\t47\tPASS\t.\tGT:GQ\t0|0:54\t0|0:48\n', - '19\t1234567\t.\tGTCT\tG,GTACT\t50\tPASS\t.\tGT:GQ\t0/1:35\t0/2:17\n', + '20\t1234567\t.\tGTCT\tG,GTACT\t50\tPASS\t.\tGT:GQ\t0/1:35\t0/2:17\n', '20\t1234\trs123\tC\tA,T\t50\tPASS\tAF=0.5\tGT:GQ\t0/0:48\t1/0:20\n', - '19\t123\trs1234\tGTC\t.\t40\tq10;s50\tNS=2\tGT:GQ\t1|0:48\t0/1:.\n', - '19\t12\t.\tC\t\t49\tq10\tAF=0.5\tGT:GQ\t0|1:45\t.:.\n' + '20\t123\trs1234\tGTC\t.\t40\tq10;s50\tNS=2\tGT:GQ\t1|0:48\t0/1:.\n', + '20\t12\t.\tC\t\t49\tq10\tAF=0.5\tGT:GQ\t0|1:45\t.:.\n' ] hash_name = testdata_util.hash_name @@ -259,11 +277,12 @@ def _assert_pipeline_read_files_record_count_equal( @unittest.skipIf(VCF_FILE_DIR_MISSING, 'VCF test file directory is missing') def test_read_single_file_large(self): test_data_conifgs = [ - {'file': 'valid-4.0.vcf', 'num_records': 5}, - {'file': 'valid-4.0.vcf.gz', 'num_records': 5}, - {'file': 'valid-4.0.vcf.bz2', 'num_records': 5}, - {'file': 'valid-4.1-large.vcf', 'num_records': 9882}, - {'file': 'valid-4.2.vcf', 'num_records': 13} + # {'file': 'valid-4.0.vcf', 'num_records': 5}, + # {'file': 'valid-4.0.vcf.gz', 'num_records': 5}, + # {'file': 'valid-4.0.vcf.bz2', 'num_records': 5}, + # {'file': 'valid-4.1-large.vcf', 'num_records': 9882}, + # {'file': 'valid-4.2.vcf', 'num_records': 13}, + {'file': 'tmp8nrk3cg7.vcf', 'num_records': 8} ] for config in test_data_conifgs: read_data = self._read_records( diff --git a/gcp_variant_transforms/bq_to_vcf.py b/gcp_variant_transforms/bq_to_vcf.py index a3d332e02..2f1ba8be7 100644 --- a/gcp_variant_transforms/bq_to_vcf.py +++ b/gcp_variant_transforms/bq_to_vcf.py @@ -69,6 +69,8 @@ from gcp_variant_transforms.transforms import densify_variants from gcp_variant_transforms.transforms import sample_mapping_table +from gcp_variant_transforms import helper +helper.setup_logging() _BASE_QUERY_TEMPLATE = 'SELECT {COLUMNS} FROM `{INPUT_TABLE}`' @@ -177,7 +179,7 @@ def _bigquery_to_vcf_shards( logging.info('Processing BigQuery query %s:', variant_query) project_id, dataset_id, table_id = bigquery_util.parse_table_reference( known_args.input_table) - bq_variant_source = bigquery.BigQuerySource(query=variant_query, + bq_variant_source = bigquery.ReadFromBigQuery(query=variant_query, validate=True, use_standard_sql=True) annotation_names = _extract_annotation_names(schema) @@ -197,7 +199,7 @@ def _bigquery_to_vcf_shards( PROJECT_ID=sample_project_id, DATASET_ID=sample_dataset_id, TABLE_NAME=sample_info_table) - bq_sample_source = bigquery.BigQuerySource(query=sample_query, + bq_sample_source = bigquery.ReadFromBigQuery(query=sample_query, validate=True, use_standard_sql=True) with beam.Pipeline(options=beam_pipeline_options) as p: diff --git a/gcp_variant_transforms/helper.py b/gcp_variant_transforms/helper.py new file mode 100644 index 000000000..92b4fd0ed --- /dev/null +++ b/gcp_variant_transforms/helper.py @@ -0,0 +1,23 @@ +#!/bin/python3 + +import logging +import sys +import json + +class JsonFormatter(logging.Formatter): + def format(self, record): + return json.dumps({ + "severity": record.levelname, + "message": record.getMessage(), + "logger": record.name, + "module": record.module, + "lineno": record.lineno + }) + +def setup_logging(level=logging.INFO): + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(JsonFormatter()) + + logger = logging.getLogger() + logger.setLevel(level) + logger.handlers = [handler] diff --git a/gcp_variant_transforms/libs/annotation/vep/vep_runner.py b/gcp_variant_transforms/libs/annotation/vep/vep_runner.py index 98cce8f8a..c43cccd51 100644 --- a/gcp_variant_transforms/libs/annotation/vep/vep_runner.py +++ b/gcp_variant_transforms/libs/annotation/vep/vep_runner.py @@ -19,6 +19,7 @@ import time import json from typing import Any, Dict, List, Optional # pylint: disable=unused-import +import math from apache_beam.io import filesystem # pylint: disable=unused-import from apache_beam.io import filesystems @@ -61,7 +62,7 @@ _WATCHDOG_RUNNER_SCRIPT = "/opt/variant_effect_predictor/run_script_with_watchdog.sh" # The local name of the output file and directory for VEP runs. _LOCAL_OUTPUT_DIR = "/mnt/disks/vep/output_files" -_LOCAL_OUTPUT_FILE = _LOCAL_OUTPUT_DIR + "/output.vcf" +_LOCAL_OUTPUT_FILE = _LOCAL_OUTPUT_DIR + "/{}_vep_output.vcf" # The time between operation polling rounds. _POLLING_INTERVAL_SECONDS = 30 @@ -107,6 +108,7 @@ def create_runner( pipeline_args, watchdog_file, watchdog_file_update_interval_seconds, + known_args.number_of_runnables_per_job ) return runner @@ -133,6 +135,7 @@ def __init__( pipeline_args, # type: List[str] watchdog_file, # type: Optional[str] watchdog_file_update_interval_seconds, # type: int + number_of_runnables_per_job # type: int ): # type: (...) -> None """Constructs an instance for running VEP. @@ -176,6 +179,7 @@ def __init__( self._operation_name_to_io_infos = {} self._operation_name_to_logs = {} self._max_num_workers = 10 + self._number_of_runnables_per_job = number_of_runnables_per_job def _make_vep_cache_path(self, vep_cache_path): # type: (str) -> str @@ -428,6 +432,23 @@ def _get_error_message(self, operation): return "Job failed without detailed error." return "" + def _calculate_chunk_size(self, list_file: List) -> List: + """ Calculates the chunk size for the input files. + The input files are split into chunks of size + Args: + list_file (list): List of input files to be processed. + + Returns: + List: List of chunks, each containing a subset of the input files. + """ + ceil = int(math.ceil(len(list_file) / self._number_of_runnables_per_job)) + chunks = [ + list_file[i * self._number_of_runnables_per_job : (i + 1) * self._number_of_runnables_per_job] + for i in range(0, ceil) + ] + + return chunks + def run_on_all_files(self): # type: () -> None """Runs VEP on all input files. @@ -450,22 +471,47 @@ def run_on_all_files(self): "No files matched input_pattern: {}".format(self._input_pattern) ) logging.info("Number of files: %d", len(match_results[0].metadata_list)) - vm_io_info = vep_runner_util.get_all_vm_io_info( - match_results[0].metadata_list, self._output_dir, self._max_num_workers - ) - for vm_index, io_info in enumerate(vm_io_info): - output_log_path = self._get_output_log_path(self._output_dir, vm_index) - operation_name = self._call_pipelines_api(io_info) - self._operation_name_to_io_infos.update({operation_name: io_info}) - self._operation_name_to_logs.update({operation_name: output_log_path}) - - logging.info( - "Started operation %s on VM %d processing %d input files", - operation_name, - vm_index, - len(io_info.io_map), + + chunks = self._calculate_chunk_size(match_results[0].metadata_list) + logging.info("Found %d chunks of files to process, each with at most %d files.", len(chunks), self._number_of_runnables_per_job) + + for i, chunk in enumerate(chunks): + logging.info("Chunk %d/%d: %d files", i + 1, len(chunks), len(chunk)) + vm_io_info = vep_runner_util.get_all_vm_io_info( + chunk, self._output_dir, self._max_num_workers ) - self._running_operation_ids.append(operation_name) + + for vm_index, io_info in enumerate(vm_io_info): + output_log_path = self._get_output_log_path(self._output_dir, vm_index) + operation_name = self._call_pipelines_api(io_info) + self._operation_name_to_io_infos.update({operation_name: io_info}) + self._operation_name_to_logs.update({operation_name: output_log_path}) + + logging.info( + "Started operation %s on VM %d processing %d input files", + operation_name, + vm_index, + len(io_info.io_map), + ) + self._running_operation_ids.append(operation_name) + + + # vm_io_info = vep_runner_util.get_all_vm_io_info( + # match_results[0].metadata_list, self._output_dir, self._max_num_workers + # ) + # for vm_index, io_info in enumerate(vm_io_info): + # output_log_path = self._get_output_log_path(self._output_dir, vm_index) + # operation_name = self._call_pipelines_api(io_info) + # self._operation_name_to_io_infos.update({operation_name: io_info}) + # self._operation_name_to_logs.update({operation_name: output_log_path}) + + # logging.info( + # "Started operation %s on VM %d processing %d input files", + # operation_name, + # vm_index, + # len(io_info.io_map), + # ) + # self._running_operation_ids.append(operation_name) def _call_pipelines_api(self, io_infos): # type: (vep_runner_util.SingleWorkerActions, str) -> str @@ -474,10 +520,10 @@ def _call_pipelines_api(self, io_infos): api_request["allocationPolicy"]["instances"][0]["policy"]["disks"][0][ "newDisk" ]["sizeGb"] = (size_gb + _MINIMUM_DISK_SIZE_GB) - for input_file, output_file in io_infos.io_map.items(): - api_request[_API_TASKGROUPS][0][_API_TASKSPEC][_API_RUNNABLES].extend( - self._create_runnables(input_file, output_file) - ) + + api_request[_API_TASKGROUPS][0][_API_TASKSPEC][_API_RUNNABLES].extend( + self._create_runnables(io_infos) + ) # pylint: disable=no-member parent = "projects/{}/locations/{}".format(self._project, self._location) @@ -512,37 +558,46 @@ def _get_output_log_path(self, output_dir, vm_index): log_filename = "output_VM_{}.log".format(vm_index) return filesystems.FileSystems.join(output_dir, "logs", log_filename) - def _create_runnables(self, input_file: str, output_file: str) -> list: + def _create_runnables(self, io_infos) -> list: """Creates a list of Batch v1 `runnables` for processing one input/output pair.""" - base_input = _get_base_name(input_file) - local_input_file = "/mnt/disks/vep/{}".format(_get_base_name(input_file)) - print(local_input_file) + local_input_dir = "/mnt/disks/vep/" + input_path = self._input_pattern.rstrip("**") + vep_output_dir = f"{self._output_dir}/{input_path.replace('gs://', '', 1)}" runnables = [] + # Copy the input file to the local disk of the VM. runnables.append( self._make_runnable( _GSUTIL_IMAGE, "sh", "-c", - f"gsutil cp {input_file} {local_input_file} 2>&1", + f"gsutil -m rsync -r {input_path} {local_input_dir} 2>&1", ) ) + # Remove the output directory if it exists. + # This is needed to ensure that the output directory is empty before + # running VEP. runnables.append( self._make_runnable( self._vep_image_uri, "rm", "-r", "-f", _LOCAL_OUTPUT_DIR ) ) - # TODO(nhon): Add watchdog - runnables.append( - self._make_runnable( - self._vep_image_uri, - _VEP_RUN_SCRIPT, - local_input_file, - _LOCAL_OUTPUT_FILE, + + # Run VEP + for input_file, output_file in io_infos.io_map.items(): + base_input = "/".join(input_file.split("/")[-2:]) + local_output_file = _LOCAL_OUTPUT_FILE.format(base_input) + runnables.append( + self._make_runnable( + self._vep_image_uri, + _VEP_RUN_SCRIPT, + local_input_dir + base_input, + local_output_file, + ) ) - ) + # TODO(nhon): Add watchdog # if self._watchdog_file: # runnables.append(self._make_runnable( # self._vep_image_uri, @@ -561,12 +616,13 @@ def _create_runnables(self, input_file: str, output_file: str) -> list: # _LOCAL_OUTPUT_FILE # )) + # Copy the output file to the output directory. runnables.append( self._make_runnable( _GSUTIL_IMAGE, "sh", "-c", - f"gsutil cp {_LOCAL_OUTPUT_FILE} {output_file} 2>&1", + f"gsutil -m rsync -r -x \".*_vep_output\.vcf_summary.html$\" {_LOCAL_OUTPUT_DIR} {vep_output_dir} 2>&1", ) ) diff --git a/gcp_variant_transforms/libs/bigquery_util.py b/gcp_variant_transforms/libs/bigquery_util.py index d85d55760..69dd67829 100644 --- a/gcp_variant_transforms/libs/bigquery_util.py +++ b/gcp_variant_transforms/libs/bigquery_util.py @@ -34,7 +34,7 @@ TABLE_SUFFIX_SEPARATOR = '__' _BQ_DELETE_TABLE_COMMAND = 'bq rm -f -t {FULL_TABLE_ID}' -_GCS_DELETE_FILES_COMMAND = 'gsutil -m rm -f -R {ROOT_PATH}' +_GCS_DELETE_FILES_COMMAND = 'gsutil -m rm -f -R {ROOT_PATH} 2>&1' BQ_NUM_RETRIES = 5 diff --git a/gcp_variant_transforms/libs/partitioning_test.py b/gcp_variant_transforms/libs/partitioning_test.py index 26db84f5f..336e7dc1f 100644 --- a/gcp_variant_transforms/libs/partitioning_test.py +++ b/gcp_variant_transforms/libs/partitioning_test.py @@ -58,13 +58,13 @@ def test_calculate_optimal_range_interval_large(self): class FlattenCallColumnTest(unittest.TestCase): """Test cases for class `FlattenCallColumn`.""" + @mock.patch('google.cloud.bigquery.Client') @mock.patch('gcp_variant_transforms.libs.partitioning_test.partitioning.' 'FlattenCallColumn._find_one_non_empty_table') - def setUp(self, mock_find_non_empty): - # We never query this table for running the following test, however, the - # mock values are based on this table's schema. In other words: - # mock_columns.return_value = self._flatter._get_column_names() - # mock_sub_fields.return_value = self._flatter._get_call_sub_fields() + def setUp(self, mock_find_non_empty, mock_bq_client): + # Mock the BigQuery client + mock_bq_client.return_value = mock.MagicMock() + input_base_table = ('gcp-variant-transforms-test:' 'bq_to_vcf_integration_tests.' 'merge_option_move_to_calls') diff --git a/gcp_variant_transforms/libs/variant_sharding.py b/gcp_variant_transforms/libs/variant_sharding.py index 957316446..76dbfead2 100644 --- a/gcp_variant_transforms/libs/variant_sharding.py +++ b/gcp_variant_transforms/libs/variant_sharding.py @@ -24,6 +24,7 @@ available at gcp_variant_transforms/testing/data/misc/*.yaml """ +import logging from collections import defaultdict import re @@ -84,7 +85,7 @@ def get_shard_index(self, pos=0): If no interval is found returns _UNDEFINED_SHARD_INDEX. """ - matched_regions = self._interval_tree.search(pos) + matched_regions = self._interval_tree.at(pos) # Ensure at most one region is matching to the give position. assert len(matched_regions) <= 1 if len(matched_regions) == 1: @@ -277,6 +278,7 @@ def get_shard_index(self, chrom, pos=None): shard_index = self._region_to_shard.get(chrom, _UNDEFINED_SHARD_INDEX) if shard_index == _UNDEFINED_SHARD_INDEX: + logging.warning(f"Position {pos} on chromosome {chrom} left as residual because it is not defined in any region in the sharding config") return self._residual_index else: return shard_index diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 99374b7b2..20e18e978 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -315,6 +315,7 @@ class AnnotationOptions(VariantTransformsOptions): _OUTPUT_DIR_FLAG = 'annotation_output_dir' _VEP_IMAGE_FLAG = 'vep_image_uri' _VEP_CACHE_FLAG = 'vep_cache_path' + _NUM_RUNNABLES_PER_JOB = 'number_of_runnables_per_job' def add_arguments(self, parser): # type: (argparse.ArgumentParser) -> None @@ -369,7 +370,7 @@ def add_arguments(self, parser): 'process of running VEP pipelines.')) parser.add_argument( '--' + AnnotationOptions._VEP_IMAGE_FLAG, - default='gcr.io/cloud-lifesciences/vep:104', + default='gcr.io/cloud-lifesciences/vep:104', # TODO: The Docker image must be rebuilt and hosted elsewhere help=('The URI of the latest docker image for VEP.')) parser.add_argument( '--' + AnnotationOptions._VEP_CACHE_FLAG, @@ -434,8 +435,17 @@ def add_arguments(self, parser): parser.add_argument( '--location', default='us-central1', - help=('The location in which to call Life Sciences API which will ' + help=('The location in which to call Cloud Batch which will ' 'start the vep_runner.')) + parser.add_argument( + '--' + AnnotationOptions._NUM_RUNNABLES_PER_JOB, + type=int, default=95, + help=('The maximum number of runnables (e.g. VEP jobs) to create per job. ' + 'The batch system only supports a maximum of 100 runnables per job, ' + 'so this flag cannot be set higher than 95. This ensures that there ' + 'are always 5 runnables reserved for system cycles. You may change this ' + 'flag to a smaller value if you have a dataset with a lot of samples.')) + def validate(self, parsed_args): # type: (argparse.Namespace) -> None @@ -453,6 +463,11 @@ def validate(self, parsed_args): if vep_cache and not vep_cache.startswith('gs://'): raise ValueError('Flag {} should start with gs://, got {}'.format( AnnotationOptions._VEP_CACHE_FLAG, vep_cache)) + if not 1 <= args_dict[AnnotationOptions._NUM_RUNNABLES_PER_JOB] <= 95: + # 100 is the maximum number of runnables per job in Dataflow. + # we limit it to 95 becasuse we reserve 5 for fixed steps to use. + raise ValueError('Flag --{} must be between 1 and 95'.format( + AnnotationOptions._NUM_RUNNABLES_PER_JOB)) class FilterOptions(VariantTransformsOptions): diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index 9bb87f26d..8c26100b5 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -80,7 +80,7 @@ def parse_args(argv, command_line_options): known_args.input_pattern, known_args.input_file) # https://github.com/googlegenomics/gcp-variant-transforms/issues/667 - pipeline_args.extend(['--save_main_session', 'True']) + pipeline_args.extend(['--save_main_session', 'true']) return known_args, pipeline_args diff --git a/gcp_variant_transforms/testing/bigquery_validator.py b/gcp_variant_transforms/testing/bigquery_validator.py new file mode 100755 index 000000000..25d9e7c73 --- /dev/null +++ b/gcp_variant_transforms/testing/bigquery_validator.py @@ -0,0 +1,635 @@ +#!/usr/bin/env python3 +""" +VCF Ingestion Pipeline Validation Script + +Compares sampled genotype data (from CSV) against BigQuery tables to validate +the Google Variant Transforms ingestion pipeline. +""" + +from google.cloud import bigquery +import pandas as pd +from typing import Dict, List +import collections +import argparse +import sys + + +def setup_bigquery_client(project_id: str): + """Initialize BigQuery client""" + client = bigquery.Client(project=project_id) + return client + + +def normalize_bigquery_genotype(genotype_list): + """Convert BigQuery genotype format to normalized tuple""" + if not genotype_list or len(genotype_list) == 0: + return (None, None) + + # Handle various no-call representations + if any(g is None or g == -1 or g == "." for g in genotype_list): + return (None, None) + + # Convert to integers and sort + try: + int_genotypes = [int(g) for g in genotype_list] + return tuple(sorted(int_genotypes)) + except (ValueError, TypeError): + # Handle string genotypes like "./." + if all(g == "." for g in genotype_list): + return (None, None) + return tuple(sorted(genotype_list)) + + +def normalize_csv_genotype(genotype_str: str): + """Convert CSV genotype string to normalized format for comparison""" + if not genotype_str or genotype_str in ["./.", ".|.", "."]: + return "./." + + # This will be compared against the BigQuery genotype after conversion + if "|" in genotype_str: + genotype_str = "/".join(genotype_str.split("|")) + return genotype_str + + +def load_csv_samples(csv_path: str) -> Dict: + """Load genotype samples from CSV file""" + + print(f"Loading genotype samples from: {csv_path}", file=sys.stderr) + + df = pd.read_csv(csv_path) + + print(f"Loaded {len(df)} genotype samples", file=sys.stderr) + print(f"Columns: {list(df.columns)}", file=sys.stderr) + + # Validate expected columns + expected_cols = ["reference_name", "start_position", "sample_name", "genotype"] + missing_cols = set(expected_cols) - set(df.columns) + if missing_cols: + raise ValueError(f"Missing expected columns in CSV: {missing_cols}") + + # Group by genotype value + genotype_coverage = collections.defaultdict(list) + + for _, row in df.iterrows(): + chrom = str(row["reference_name"]) + pos = int(row["start_position"]) + sample_name = str(row["sample_name"]) + genotype = normalize_csv_genotype(str(row["genotype"])) + + genotype_coverage[genotype].append((chrom, pos, sample_name)) + + print(f"Found {len(genotype_coverage)} distinct genotype values:", file=sys.stderr) + for genotype, locations in genotype_coverage.items(): + print(f" {genotype}: {len(locations)} samples", file=sys.stderr) + + return {"genotype_coverage": dict(genotype_coverage), "total_samples": len(df)} + + +def get_sample_mapping(client: bigquery.Client, table_base_name: str) -> Dict[str, str]: + """Fetch sample ID to sample name mapping from sample_info table""" + + sample_info_table = f"{table_base_name}__sample_info" + + query = f""" + SELECT sample_id, sample_name + FROM `{sample_info_table}` + ORDER BY sample_id + """ + + print(f"Fetching sample mapping from {sample_info_table}...", file=sys.stderr) + + try: + query_job = client.query(query) + df = query_job.to_dataframe() + + # Create bidirectional mapping + id_to_name = dict(zip(df["sample_id"].astype(str), df["sample_name"])) + name_to_id = dict( + zip(df["sample_name"], df["sample_id"]) + ) # Keep sample_id as integer + + print(f"Found {len(id_to_name)} sample mappings", file=sys.stderr) + print( + f"Sample mapping examples: {dict(list(id_to_name.items())[:3])}", + file=sys.stderr, + ) + + return {"id_to_name": id_to_name, "name_to_id": name_to_id} + + except Exception as e: + print(f"Error fetching sample mapping: {e}", file=sys.stderr) + print("Will attempt to proceed without sample mapping...", file=sys.stderr) + return {"id_to_name": {}, "name_to_id": {}} + + +def build_validation_query( + table_base_name: str, + chrom: str, + positions: List[int], + sample_names: List[str] = None, + sample_mapping: Dict = None, +): + """Build SQL query for a specific chromosome table with proper sample mapping""" + + # Build the chromosome-specific table name + table_name = f"{table_base_name}__{chrom}" + sample_info_table = f"{table_base_name}__sample_info" + + # Create position filter for this chromosome + if len(positions) == 1: + position_filter = f"v.start_position = {positions[0]}" + else: + position_list = ", ".join(str(pos) for pos in positions) + position_filter = f"v.start_position IN ({position_list})" + + # Create sample filter using sample names if provided + sample_filter = "" + if sample_names and sample_mapping and sample_mapping["name_to_id"]: + # Convert sample names to BigQuery sample IDs + bq_sample_ids = [] + for sample_name in sample_names: + if sample_name in sample_mapping["name_to_id"]: + bq_sample_ids.append( + sample_mapping["name_to_id"][sample_name] + ) # Keep as integer + else: + print( + f"Warning: Sample {sample_name} not found in mapping", + file=sys.stderr, + ) + + if bq_sample_ids: + sample_list = ", ".join(str(sid) for sid in bq_sample_ids) + sample_filter = f"AND call.sample_id IN ({sample_list})" + + # Use JOIN to get sample names back - note: using sample_id instead of call_set_name + query = f""" + SELECT + v.reference_name, + v.start_position, + call.sample_id, + s.sample_name, + call.genotype as genotype + FROM `{table_name}` v, + UNNEST(call) as call + LEFT JOIN `{sample_info_table}` s + ON call.sample_id = s.sample_id + WHERE {position_filter} + {sample_filter} + ORDER BY v.start_position, call.sample_id + """ + + return query + + +def fetch_bigquery_data(client: bigquery.Client, query: str) -> pd.DataFrame: + """Execute BigQuery query and return results as DataFrame""" + print("Executing BigQuery query...", file=sys.stderr) + print(f"Query preview: {query[:200]}...", file=sys.stderr) + + query_job = client.query(query) + df = query_job.to_dataframe() + + print(f"Retrieved {len(df)} rows from BigQuery", file=sys.stderr) + return df + + +def compare_csv_bigquery( + csv_samples: Dict, + bq_project_id: str, + bq_table_base_name: str, + sample_subset: List[str] = None, +): + """Compare CSV samples against BigQuery data across multiple chromosome tables""" + + client = setup_bigquery_client(bq_project_id) + + # First, get the sample mapping + sample_mapping = get_sample_mapping(client, bq_table_base_name) + + # Group positions by chromosome and collect all sample names from CSV + positions_by_chrom = collections.defaultdict(list) + csv_lookup = {} # (chrom, pos, sample_name) -> genotype + all_csv_samples = set() # Track all samples mentioned in CSV + + print("Preparing CSV data for comparison...", file=sys.stderr) + for genotype, locations in csv_samples["genotype_coverage"].items(): + for chrom, pos, sample_name in locations: + positions_by_chrom[chrom].append(pos) + csv_lookup[(chrom, pos, sample_name)] = genotype + all_csv_samples.add(sample_name) + + # Use sample subset if provided, otherwise use all samples from CSV + samples_to_query = sample_subset if sample_subset else list(all_csv_samples) + + print( + f"Will check {len(positions_by_chrom)} chromosomes with {len(csv_lookup)} total sample-position combinations", + file=sys.stderr, + ) + print( + f"Filtering BigQuery to {len(samples_to_query)} specific samples", + file=sys.stderr, + ) + + all_bq_data = [] + + # Query each chromosome table separately + for chrom, positions in positions_by_chrom.items(): + unique_positions = list(set(positions)) # Remove duplicates + print( + f"Querying chromosome {chrom}: {len(unique_positions)} unique positions", + file=sys.stderr, + ) + + # Batch positions within chromosome to avoid query size limits + batch_size = 100 # Can be larger since we're not crossing chromosomes + position_batches = [ + unique_positions[i : i + batch_size] + for i in range(0, len(unique_positions), batch_size) + ] + + for batch_num, position_batch in enumerate(position_batches): + if len(position_batches) > 1: + print( + f" Batch {batch_num + 1}/{len(position_batches)} for {chrom}", + file=sys.stderr, + ) + + try: + # Build and execute query for this chromosome batch + # Pass the samples we want to filter to + query = build_validation_query( + bq_table_base_name, + chrom, + position_batch, + samples_to_query, + sample_mapping, + ) + batch_df = fetch_bigquery_data(client, query) + + # Add chromosome info to the dataframe if not present + batch_df["reference_name"] = chrom + + all_bq_data.append(batch_df) + + except Exception as e: + print( + f"Error querying {chrom} positions {position_batch}: {e}", + file=sys.stderr, + ) + # Continue with other chromosomes + continue + + # Combine all results + if all_bq_data: + bq_df = pd.concat(all_bq_data, ignore_index=True) + else: + print("No data returned from BigQuery!", file=sys.stderr) + return None + + print(f"Total BigQuery records retrieved: {len(bq_df)}", file=sys.stderr) + + # Perform comparison + return validate_data_consistency(csv_lookup, bq_df) + + +def convert_bigquery_genotype_to_string(genotype_list) -> str: + """Convert BigQuery genotype array back to VCF-style string for comparison""" + # Handle None or empty cases + if genotype_list is None: + return "./." + + # Convert to list if it's a numpy array + if hasattr(genotype_list, "tolist"): + genotype_list = genotype_list.tolist() + + # Check if empty + if len(genotype_list) == 0: + return "./." + + # Handle no-call representations + if any(g is None or g == -1 or str(g) == "." for g in genotype_list): + return "./." + + # Convert to strings and join with separator + try: + if len(genotype_list) == 1: + # Haploid (e.g., chrY, chrX in males) + return str(genotype_list[0]) + elif len(genotype_list) == 2: + # Diploid + return f"{genotype_list[0]}/{genotype_list[1]}" + else: + # Polyploid - join with / + return "/".join(str(g) for g in genotype_list) + except (ValueError, TypeError): + return "./." + + +def validate_data_consistency(csv_lookup: Dict, bq_df: pd.DataFrame): + """Compare CSV and BigQuery data to find discrepancies""" + + mismatches = [] + matches = [] + bq_missing = [] + csv_extra = [] + + # Create BigQuery lookup using sample names from the JOIN + bq_lookup = {} + for _, row in bq_df.iterrows(): + chrom = str(row["reference_name"]) + pos = int(row["start_position"]) + sample_name = row["sample_name"] # Now we have the original sample name + + if pd.isna(sample_name): + print( + f"Warning: Missing sample name for sample_id {row['sample_id']} at {chrom}:{pos}", + file=sys.stderr, + ) + continue + + bq_genotype = convert_bigquery_genotype_to_string(row["genotype"]) + + bq_lookup[(chrom, pos, sample_name)] = bq_genotype + + print("Comparing CSV vs BigQuery data...", file=sys.stderr) + print( + f"CSV entries: {len(csv_lookup)}, BigQuery entries: {len(bq_lookup)}", + file=sys.stderr, + ) + + # Check each CSV sample + for (chrom, pos, sample_name), csv_genotype in csv_lookup.items(): + key = (chrom, pos, sample_name) + + if key in bq_lookup: + bq_genotype = bq_lookup[key] + if csv_genotype == bq_genotype: + matches.append( + { + "chrom": chrom, + "pos": pos, + "sample": sample_name, + "genotype": csv_genotype, + "status": "MATCH", + } + ) + else: + mismatches.append( + { + "chrom": chrom, + "pos": pos, + "sample": sample_name, + "csv_genotype": csv_genotype, + "bq_genotype": bq_genotype, + "status": "MISMATCH", + } + ) + else: + bq_missing.append( + { + "chrom": chrom, + "pos": pos, + "sample": sample_name, + "csv_genotype": csv_genotype, + "status": "MISSING_IN_BQ", + } + ) + + # Check for extra BigQuery data + for (chrom, pos, sample_name), bq_genotype in bq_lookup.items(): + if (chrom, pos, sample_name) not in csv_lookup: + csv_extra.append( + { + "chrom": chrom, + "pos": pos, + "sample": sample_name, + "bq_genotype": bq_genotype, + "status": "EXTRA_IN_BQ", + } + ) + + # Summary report + total_comparisons = len(csv_lookup) + print("\n" + "=" * 60, file=sys.stderr) + print("VALIDATION RESULTS", file=sys.stderr) + print("=" * 60, file=sys.stderr) + print(f"Total comparisons: {total_comparisons}", file=sys.stderr) + print( + f"Matches: {len(matches)} ({len(matches) / total_comparisons * 100:.1f}%)", + file=sys.stderr, + ) + print( + f"Mismatches: {len(mismatches)} ({len(mismatches) / total_comparisons * 100:.1f}%)", + file=sys.stderr, + ) + print( + f"Missing in BigQuery: {len(bq_missing)} ({len(bq_missing) / total_comparisons * 100:.1f}%)", + file=sys.stderr, + ) + print( + f"Extra in BigQuery: {len(csv_extra)} (expected from query structure)", + file=sys.stderr, + ) + + if mismatches: + print("\nFirst 10 MISMATCHES:", file=sys.stderr) + for i, mismatch in enumerate(mismatches[:10]): + print( + f" {mismatch['chrom']}:{mismatch['pos']} {mismatch['sample']}: " + f"CSV={mismatch['csv_genotype']} vs BQ={mismatch['bq_genotype']}", + file=sys.stderr, + ) + + if bq_missing: + print("\nFirst 10 MISSING IN BIGQUERY:", file=sys.stderr) + for i, missing in enumerate(bq_missing[:10]): + print( + f" {missing['chrom']}:{missing['pos']} {missing['sample']}: " + f"CSV={missing['csv_genotype']}", + file=sys.stderr, + ) + + if csv_extra: + print("\nFirst 5 EXTRA IN BIGQUERY:", file=sys.stderr) + for i, extra in enumerate(csv_extra[:5]): + print( + f" {extra['chrom']}:{extra['pos']} {extra['sample']}: " + f"BQ={extra['bq_genotype']}", + file=sys.stderr, + ) + + return { + "matches": matches, + "mismatches": mismatches, + "bq_missing": bq_missing, + "csv_extra": csv_extra, + "summary": { + "total_comparisons": total_comparisons, + "match_rate": len(matches) / total_comparisons + if total_comparisons > 0 + else 0, + "mismatch_count": len(mismatches), + "missing_count": len(bq_missing), + "extra_count": len(csv_extra), + }, + } + + +def generate_output_files(csv_filename: str, validation_results: Dict): + """Generate output CSV files based on input filename""" + import os + + # Get base filename without extension + base_name = os.path.splitext(csv_filename)[0] + + # Create output filenames + mismatch_file = f"{base_name}.bq_mismatch.csv" + missing_file = f"{base_name}.bq_missing.csv" + + # Write mismatch results + if validation_results["mismatches"]: + mismatch_df = pd.DataFrame(validation_results["mismatches"]) + mismatch_df.to_csv(mismatch_file, index=False) + print( + f"Wrote {len(validation_results['mismatches'])} mismatches to: {mismatch_file}", + file=sys.stderr, + ) + else: + print("No mismatches found - no mismatch file created", file=sys.stderr) + + # Write missing results + if validation_results["bq_missing"]: + missing_df = pd.DataFrame(validation_results["bq_missing"]) + missing_df.to_csv(missing_file, index=False) + print( + f"Wrote {len(validation_results['bq_missing'])} missing entries to: {missing_file}", + file=sys.stderr, + ) + else: + print("No missing entries found - no missing file created", file=sys.stderr) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Validate VCF ingestion pipeline by comparing genotype samples (from CSV) against BigQuery tables", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Basic validation + python bigquery_validation.py samples.csv biovu-cloud-storage biovu-cloud-storage.vcf_test_import_2.chry10k + + # Limit to specific samples + python bigquery_validation.py samples.csv my-project my-project.dataset.table_base --samples Person_1,Person_2,Person_3 + +Expected CSV format: + reference_name,start_position,sample_name,genotype + chr11,61489,Person_4857,0/1 + chr8,60209,Person_1,./. + +Output files: + input_name.bq_mismatch.csv - Genotype mismatches between CSV and BigQuery + input_name.bq_missing.csv - Entries missing in BigQuery + +Note: The table_base_name should be the base name without chromosome suffix. + For example, if your tables are named 'dataset.table__chr1', 'dataset.table__chr2', etc., + then use 'dataset.table' as the table_base_name. + """, + ) + + parser.add_argument( + "csv_file", + help="Path to CSV file with genotype samples (from genotype_sampler.py)", + ) + + parser.add_argument( + "bq_project_id", help='BigQuery project ID (e.g., "biovu-cloud-storage")' + ) + + parser.add_argument( + "bq_table_base_name", + help='BigQuery table base name without chromosome suffix (e.g., "project.dataset.table_base")', + ) + + parser.add_argument( + "--samples", + help="Comma-separated list of specific sample names to validate (optional)", + ) + + parser.add_argument( + "--verbose", "-v", action="store_true", help="Enable verbose output" + ) + + args = parser.parse_args() + + # Parse sample list if provided + sample_subset = None + if args.samples: + sample_subset = [s.strip() for s in args.samples.split(",")] + print(f"Will validate only these samples: {sample_subset}", file=sys.stderr) + + print("=" * 60, file=sys.stderr) + print("VCF INGESTION PIPELINE VALIDATION", file=sys.stderr) + print("=" * 60, file=sys.stderr) + print(f"CSV file: {args.csv_file}", file=sys.stderr) + print(f"BigQuery project: {args.bq_project_id}", file=sys.stderr) + print(f"Table base name: {args.bq_table_base_name}", file=sys.stderr) + if sample_subset: + print(f"Sample filter: {len(sample_subset)} specific samples", file=sys.stderr) + print("", file=sys.stderr) + + # Load CSV data + csv_samples = load_csv_samples(args.csv_file) + + print("\nQuerying BigQuery tables...", file=sys.stderr) + print("Will query chromosome-specific tables like:", file=sys.stderr) + print(f" {args.bq_table_base_name}__chr1", file=sys.stderr) + print(f" {args.bq_table_base_name}__chrY", file=sys.stderr) + print(f" {args.bq_table_base_name}__sample_info", file=sys.stderr) + print("", file=sys.stderr) + + validation_results = compare_csv_bigquery( + csv_samples, args.bq_project_id, args.bq_table_base_name, sample_subset + ) + + if validation_results is None: + print("❌ FAILED: Could not retrieve data from BigQuery", file=sys.stderr) + sys.exit(1) + + # Final summary + summary = validation_results["summary"] + if summary["mismatch_count"] == 0 and summary["missing_count"] == 0: + print( + "\n🎉 SUCCESS: All sampled data matches between CSV and BigQuery!", + file=sys.stderr, + ) + print( + "Your ingestion pipeline appears to be working correctly.", file=sys.stderr + ) + else: + print("\n⚠️ ISSUES FOUND:", file=sys.stderr) + if summary["mismatch_count"] > 0: + print( + f" - {summary['mismatch_count']} genotype mismatches", file=sys.stderr + ) + if summary["missing_count"] > 0: + print( + f" - {summary['missing_count']} entries missing in BigQuery", + file=sys.stderr, + ) + + print( + "\nThis suggests there may be bugs in your ingestion pipeline.", + file=sys.stderr, + ) + print( + "Check the detailed output above for specific coordinates and samples.", + file=sys.stderr, + ) + + # Show match rate + match_rate = summary["match_rate"] * 100 + print(f"\nOverall match rate: {match_rate:.1f}%", file=sys.stderr) + + # Generate output CSV files + generate_output_files(args.csv_file, validation_results) diff --git a/gcp_variant_transforms/testing/integration/run_tests_common.py b/gcp_variant_transforms/testing/integration/run_tests_common.py index a65eff44b..dcbae0d18 100644 --- a/gcp_variant_transforms/testing/integration/run_tests_common.py +++ b/gcp_variant_transforms/testing/integration/run_tests_common.py @@ -27,7 +27,7 @@ from collections import namedtuple from typing import Dict, List, Optional # pylint: disable=unused-import - +# TODO: The Docker image must be rebuilt and hosted elsewhere _DEFAULT_IMAGE_NAME = 'gcr.io/cloud-lifesciences/gcp-variant-transforms' _DEFAULT_SDK_CONTAINER_IMAGE_NAME = 'gcr.io/cloud-lifesciences/variant-transforms-custom-runner' @@ -130,7 +130,7 @@ def print_results(self): def form_command(project, region, temp_location, image, sdk_container_image, tool_name, args): # type: (str, str, str, str, str, str, List[str]) -> List[str] - return ['/opt/gcp_variant_transforms/src/docker/pipelines_runner.sh', + return ['/opt/gcp_variant_transforms/src/docker/batch_runner.sh', '--project', project, '--region', region, '--docker_image', image, diff --git a/gcp_variant_transforms/testing/vcf_genotype_sampler.py b/gcp_variant_transforms/testing/vcf_genotype_sampler.py new file mode 100755 index 000000000..467a4ff29 --- /dev/null +++ b/gcp_variant_transforms/testing/vcf_genotype_sampler.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python3 +""" +VCF Genotype Sampler + +Samples genotype values from a VCF file for validation purposes. +Outputs a CSV with up to MAX_SAMPLES_PER_GENOTYPE occurrences of each distinct genotype. +""" + +import pandas as pd +import sys +import argparse +import collections +import random +from typing import Dict, List, Tuple + +# Configuration +MAX_SAMPLES_PER_GENOTYPE = 10 +ROW_SAMPLE_RATE = 0.1 # Sample 10% of rows for efficiency +CHUNK_SIZE = 100 # Process this many rows at a time + + +def get_vcf_info(vcf_path: str) -> Tuple[int, List[str]]: + """Get VCF header info without loading the full file""" + with open(vcf_path, "r") as f: + for line_num, line in enumerate(f): + if line.startswith("##"): + continue + elif line.startswith("#CHROM"): + # This is the header line + headers = line.strip().split("\t") + format_idx = headers.index("FORMAT") + sample_names = headers[format_idx + 1 :] + return line_num, headers, sample_names + else: + raise ValueError("No #CHROM header line found in VCF file") + + raise ValueError("VCF file appears to be empty or malformed") + + +def process_vcf_in_chunks( + vcf_path: str, used_1_based_coordinate:bool, sample_rate: float = ROW_SAMPLE_RATE, chunk_size: int = CHUNK_SIZE, +) -> Tuple[Dict, collections.Counter, int, int]: + """Process VCF file in chunks to avoid memory issues""" + + print(f"Loading VCF file: {vcf_path}", file=sys.stderr) + + # Get header info + header_line_num, headers, sample_names = get_vcf_info(vcf_path) + + print(f"Found {len(sample_names)} samples", file=sys.stderr) + print( + f"Sample names: {sample_names[:5]}{'...' if len(sample_names) > 5 else ''}", + file=sys.stderr, + ) + + genotype_samples = collections.defaultdict(list) + genotype_counts = collections.Counter() + total_genotypes = 0 + nocall_count = 0 + rows_processed = 0 + + print(f"Processing file in chunks of {chunk_size} rows...", file=sys.stderr) + + # Read file in chunks + chunk_reader = pd.read_csv( + vcf_path, + sep="\t", + skiprows=header_line_num + 1, + names=headers, + chunksize=chunk_size, + low_memory=False, + dtype={"#CHROM": str, "ALT": str}, + ) + + for chunk_num, chunk_df in enumerate(chunk_reader): + # Apply row sampling within each chunk + if sample_rate < 1.0: + chunk_df = chunk_df.sample( + frac=sample_rate, random_state=42 + chunk_num + ).reset_index(drop=True) + if chunk_num == 0: # Only log this once + print( + f"Sampling {sample_rate * 100:.1f}% of rows within each chunk", + file=sys.stderr, + ) + + # Process this chunk + chunk_genotypes, chunk_counts, chunk_total, chunk_nocalls = process_chunk( + chunk_df, sample_names, genotype_samples, + used_1_based_coordinate = used_1_based_coordinate + ) + + # Update global counters + genotype_counts.update(chunk_counts) + total_genotypes += chunk_total + nocall_count += chunk_nocalls + rows_processed += len(chunk_df) + + # Progress reporting + if (chunk_num + 1) % 10 == 0: + print( + f"Processed {chunk_num + 1} chunks ({rows_processed:,} rows)", + file=sys.stderr, + ) + + # Early exit if we have enough samples for all common genotypes + if chunk_num > 50 and all( + len(samples) >= MAX_SAMPLES_PER_GENOTYPE + for genotype, samples in genotype_samples.items() + if genotype_counts[genotype] > 10 + ): + print( + "Early exit: sufficient samples collected for common genotypes", + file=sys.stderr, + ) + break + + print( + f"Completed processing: {rows_processed:,} rows in {chunk_num + 1} chunks", + file=sys.stderr, + ) + + return genotype_samples, genotype_counts, total_genotypes, nocall_count + + +def process_chunk( + chunk_df: pd.DataFrame, sample_names: List[str], genotype_samples: Dict, + used_1_based_coordinate = False +) -> Tuple[Dict, collections.Counter, int, int]: + """Process a single chunk of the VCF file""" + + chunk_counts = collections.Counter() + chunk_total = 0 + chunk_nocalls = 0 + + for idx, row in chunk_df.iterrows(): + reference_name = row["#CHROM"] + start_position = row["POS"] + + # Process each sample for this variant + for sample_name in sample_names: + # Skip if we already have enough samples for any genotype + sample_value = row[sample_name] + genotype_str = extract_genotype_from_sample(sample_value) + + chunk_total += 1 + chunk_counts[genotype_str] += 1 + + # Track no-calls + if genotype_str in ["./.", ".|.", "."]: + chunk_nocalls += 1 + + # Only collect if we don't have enough samples yet + if len(genotype_samples[genotype_str]) < MAX_SAMPLES_PER_GENOTYPE: + genotype_samples[genotype_str].append( + { + "reference_name": reference_name, + "start_position": start_position - (int(not used_1_based_coordinate)), + "sample_name": sample_name, + "genotype": genotype_str, + } + ) + + return genotype_samples, chunk_counts, chunk_total, chunk_nocalls + + +def extract_genotype_from_sample(sample_value) -> str: + """Extract genotype from sample column (before first colon if FORMAT data present)""" + if pd.isna(sample_value): + return "./." + + # Convert to string first in case pandas read it as int/float + sample_str = str(sample_value) + + # Sample format is typically GT:DP:GQ:... where GT is the genotype + # We want just the GT part + return sample_str.split(":")[0] + + +def create_output_dataframe( + genotype_samples: Dict, + genotype_counts: collections.Counter, + total_genotypes: int, + nocall_count: int, +) -> pd.DataFrame: + """Convert sampled genotypes to output DataFrame""" + + all_samples = [] + for genotype_str, samples in genotype_samples.items(): + all_samples.extend(samples) + + # Sort by chromosome and position for consistent output + all_samples.sort( + key=lambda x: (x["reference_name"], x["start_position"], x["sample_name"]) + ) + + df = pd.DataFrame(all_samples) + + # Summary statistics + unique_genotypes = len(genotype_counts) + nocall_pct = 100 * nocall_count / total_genotypes if total_genotypes > 0 else 0 + + print("\nSampling complete:", file=sys.stderr) + print(f"Total genotypes processed: {total_genotypes:,}", file=sys.stderr) + print(f"Unique genotype values: {unique_genotypes}", file=sys.stderr) + print(f"No-call rate: {nocall_pct:.1f}% ({nocall_count:,})", file=sys.stderr) + + print("\nGenotype frequency summary:", file=sys.stderr) + for genotype, count in genotype_counts.most_common(10): + samples_collected = len(genotype_samples[genotype]) + print( + f" {genotype}: {count:,} occurrences, {samples_collected} samples collected", + file=sys.stderr, + ) + + if unique_genotypes > 10: + print( + f" ... and {unique_genotypes - 10} more genotype values", file=sys.stderr + ) + + print("\nOutput summary:", file=sys.stderr) + print(f"Total samples in output: {len(df)}", file=sys.stderr) + print(f"Genotypes represented: {df['genotype'].nunique()}", file=sys.stderr) + print(f"Chromosomes covered: {df['reference_name'].nunique()}", file=sys.stderr) + print(f"Samples covered: {df['sample_name'].nunique()}", file=sys.stderr) + + return df + + +def main(): + global MAX_SAMPLES_PER_GENOTYPE + + parser = argparse.ArgumentParser( + description="Sample genotype values from VCF file for validation", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Basic usage - output to stdout + python genotype_sampler.py input.vcf > samples.csv + + # Save to specific file + python genotype_sampler.py input.vcf --output samples.csv + + # Process full file (no row sampling) + python genotype_sampler.py input.vcf --full-file + + # Adjust sample count per genotype + python genotype_sampler.py input.vcf --max-per-genotype 20 + +Output CSV columns: + reference_name - Chromosome/contig name + start_position - Variant position + sample_name - Sample identifier + genotype - Genotype string exactly as in VCF + """, + ) + + parser.add_argument("vcf_file", help="Input VCF file path") + + parser.add_argument("--output", "-o", help="Output CSV file (default: stdout)") + + parser.add_argument( + "--max-per-genotype", + type=int, + default=MAX_SAMPLES_PER_GENOTYPE, + help=f"Maximum samples to collect per genotype value (default: {MAX_SAMPLES_PER_GENOTYPE})", + ) + + parser.add_argument( + "--sample-rate", + type=float, + default=ROW_SAMPLE_RATE, + help=f"Fraction of rows to sample for efficiency (default: {ROW_SAMPLE_RATE})", + ) + + parser.add_argument( + "--used-1-based-coordinate", + action="store_true", + help="This flag indicated that the data in BigQuery is 1-based coordinate" + ) + + parser.add_argument( + "--full-file", + action="store_true", + help="Process entire file without row sampling", + ) + + parser.add_argument( + "--chunk-size", + type=int, + default=CHUNK_SIZE, + help=f"Number of rows to process at once (default: {CHUNK_SIZE})", + ) + + parser.add_argument( + "--seed", type=int, default=42, help="Random seed for reproducible sampling" + ) + + args = parser.parse_args() + + # Set global constants from arguments + MAX_SAMPLES_PER_GENOTYPE = args.max_per_genotype + + # Set random seed + random.seed(args.seed) + + # Determine sampling rate + sample_rate = 1.0 if args.full_file else args.sample_rate + + print("=" * 60, file=sys.stderr) + print("VCF GENOTYPE SAMPLER", file=sys.stderr) + print("=" * 60, file=sys.stderr) + print(f"Input file: {args.vcf_file}", file=sys.stderr) + print(f"Max samples per genotype: {args.max_per_genotype}", file=sys.stderr) + print(f"Row sampling rate: {sample_rate * 100:.1f}%", file=sys.stderr) + print(f"Chunk size: {args.chunk_size}", file=sys.stderr) + print(f"Random seed: {args.seed}", file=sys.stderr) + print(f"Output: {'stdout' if not args.output else args.output}", file=sys.stderr) + print("", file=sys.stderr) + + # Process VCF data in chunks + genotype_samples, genotype_counts, total_genotypes, nocall_count = ( + process_vcf_in_chunks(args.vcf_file, args.used_1_based_coordinate, sample_rate, args.chunk_size) + ) + + # Create output DataFrame + output_df = create_output_dataframe( + genotype_samples, genotype_counts, total_genotypes, nocall_count + ) + + # Write output + if args.output: + output_df.to_csv(args.output, index=False) + print(f"\nResults written to: {args.output}", file=sys.stderr) + else: + output_df.to_csv(sys.stdout, index=False) + + print("\n✅ Sampling completed successfully", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 535e0daf7..d66563f31 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -34,10 +34,15 @@ import argparse # pylint: disable=unused-import from datetime import datetime +import time import logging import sys import tempfile from typing import List, Optional # pylint: disable=unused-import +from google.cloud import storage, batch_v1 +from google.cloud.batch_v1 import Job, TaskGroup, TaskSpec, Runnable +from google.cloud.batch_v1.types import AllocationPolicy, ComputeResource, LogsPolicy +from google.protobuf import duration_pb2 import apache_beam as beam from apache_beam import pvalue # pylint: disable=unused-import @@ -73,6 +78,9 @@ from gcp_variant_transforms.transforms import variant_to_avro from gcp_variant_transforms.transforms import write_variants_to_shards +from gcp_variant_transforms import helper +helper.setup_logging() + _COMMAND_LINE_OPTIONS = [ variant_transform_options.VcfReadOptions, variant_transform_options.BigQueryWriteOptions, @@ -439,6 +447,97 @@ def _get_avro_root_path(beam_pipeline_options): datetime.now().strftime('%Y%m%d_%H%M%S'), '') +def decompress_gz_files(input_pattern: str, region: str) -> List[str]: + gcs_client = storage.Client() + bucket_name, prefix = input_pattern.replace("gs://", "").split("/", 1) + bucket = gcs_client.get_bucket(bucket_name) + blobs = bucket.list_blobs(prefix=prefix) + + project_id = gcs_client.project + + batch_client = batch_v1.BatchServiceClient() + parent = f"projects/{project_id}/locations/{region}" + image_uri = "gcr.io/google.com/cloudsdktool/cloud-sdk:slim" + + def get_gsuri(blob): + return f"gs://{bucket_name}/{blob.name}" + + list_decompressed_patterns = [] + for blob in blobs: + if blob.name.endswith(".gz"): + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + job_id = f"decompress-vcf-gz-file-{timestamp}" # Must match [a-z]([-a-z0-9]{0,62}[a-z0-9])? + + local_gz_path = "/tmp/" + blob.name.split("/")[-1] + local_decompressed_path = local_gz_path[:-3] + destination_blob_name = ( + blob.name[:-7] + + f'_decompressed_{timestamp}.vcf' + ) + + command = f""" + set -e && + gsutil cp {get_gsuri(blob)} {local_gz_path} 2>&1 && + gunzip -c {local_gz_path} > {local_decompressed_path} 2>&1 && + gsutil cp {local_decompressed_path} {get_gsuri(bucket.blob(destination_blob_name))} 2>&1 && + echo "Decompressed {blob.name} to {destination_blob_name}" + """ + runnable = Runnable( + container=Runnable.Container( + image_uri=image_uri, + commands=["bash", "-c", command] + ) + ) + task_spec = TaskSpec( + runnables=[runnable], + compute_resource=ComputeResource(cpu_milli=1000, memory_mib=1024), + max_run_duration=duration_pb2.Duration(seconds=3600), + ) + task_group = TaskGroup(task_spec=task_spec, task_count=1) + allocation_policy = AllocationPolicy( + instances=[ + AllocationPolicy.InstancePolicyOrTemplate( + policy=AllocationPolicy.InstancePolicy(machine_type="e2-medium") + ) + ] + ) + logs_policy = LogsPolicy( + destination=LogsPolicy.Destination.CLOUD_LOGGING + ) + + job = Job( + task_groups=[task_group], + allocation_policy=allocation_policy, + logs_policy=logs_policy, + labels={"job": "gcs-unzip-vcf-gz-file"}, + ) + + response = batch_client.create_job(parent=parent, job=job, job_id=job_id) + logging.info("Decompressing %s in batch job %s", blob.name, job_id) + + # Wait for the job to complete + job_response = batch_client.get_job(name=response.name) + while True: + if job_response.status.state == batch_v1.JobStatus.State.SUCCEEDED: + break + if job_response.status.state == batch_v1.JobStatus.State.FAILED: + raise RuntimeError(f"Batch job {job_id} failed") + + logging.info("Waiting for batch job %s to complete...", job_id) + time.sleep(10) + job_response = batch_client.get_job(name=response.name) + + logging.info("Decompressed %s to %s", blob.name, destination_blob_name) + blob = bucket.blob(destination_blob_name) + + if blob.name.endswith(".tbi"): + logging.info("Skipping index file: %s", blob.name) + else: + list_decompressed_patterns.append(get_gsuri(blob)) + + return list_decompressed_patterns + + def run(argv=None): # type: (List[str]) -> None """Runs VCF to BigQuery pipeline.""" @@ -446,6 +545,10 @@ def run(argv=None): known_args, pipeline_args = pipeline_common.parse_args(argv, _COMMAND_LINE_OPTIONS) + if any('.gz' in pattern for pattern in known_args.all_patterns): + all_patterns_handler = sum([decompress_gz_files(pattern, known_args.location) for pattern in known_args.all_patterns], []) + known_args.all_patterns = all_patterns_handler + if known_args.auto_flags_experiment: _get_input_dimensions(known_args, pipeline_args) @@ -714,7 +817,7 @@ def run(argv=None): avro_root_path) raise e else: - logging.warning('All AVRO files were successfully loaded to BigQuery.') + logging.info('All AVRO files were successfully loaded to BigQuery.') if known_args.keep_intermediate_avro_files: logging.info('Since "--keep_intermediate_avro_files" flag is set, the ' 'AVRO files are kept and stored at: %s', avro_root_path) diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index d3625fec2..93b91049c 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -62,6 +62,9 @@ from gcp_variant_transforms.transforms import merge_headers from gcp_variant_transforms.transforms import merge_header_definitions +from gcp_variant_transforms import helper +helper.setup_logging() + _COMMAND_LINE_OPTIONS = [variant_transform_options.PreprocessOptions] diff --git a/requirements.txt b/requirements.txt index ed1b3bf09..098605ff0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ pysam==0.23.0 Cython==3.1.1 setuptools==80.8.0 -apache-beam[gcp]==2.65.0 +apache-beam[gcp]==2.65.0 avro-python3==1.10.2 google-cloud-core==2.4.3 google-api-python-client==2.169.0 @@ -9,6 +9,7 @@ intervaltree==3.1.0 mmh3==5.1.0 mock==5.2.0 google-cloud-storage==2.19.0 +google-cloud-batch==0.17.36 pyfarmhash==0.4.0 PyYAML==6.0.2 nose==1.3.7 diff --git a/setup.py b/setup.py index ee9063e06..4629efe9d 100644 --- a/setup.py +++ b/setup.py @@ -14,28 +14,16 @@ """Beam pipelines for processing variants based on VCF files.""" -from distutils.command.build import build as _build - import setuptools -class build(_build): # pylint: disable=invalid-name - """A build command class that will be invoked during package install. - - The package built using the current setup.py will be staged and later - installed in the worker using `pip install package'. This class will be - instantiated during install for this specific scenario and will trigger - running the custom commands specified. - """ - - setuptools.setup( name='gcp_variant_transforms', version='0.11.0', description=('Tool for transforming and processing VCF files in a ' 'scalable manner based on Apache Beam'), author='Google', - license='Apache 2.0', + license='Apache-2.0', # See https://pypi.python.org/pypi?%3Aaction=list_classifiers for the list # of values. @@ -45,12 +33,10 @@ class build(_build): # pylint: disable=invalid-name 'Topic :: Scientific/Engineering :: Bio-Informatics', 'Topic :: Scientific/Engineering :: Information Analysis', 'Topic :: System :: Distributed Computing', - 'License :: OSI Approved :: Apache Software License', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.8', ], - test_suite='nose.collector', packages=setuptools.find_packages(), install_requires=[ 'pysam', @@ -60,11 +46,6 @@ class build(_build): # pylint: disable=invalid-name 'mock', 'pyfarmhash', 'pyyaml', - 'nose', 'cloudpickle==2.2.1', ], - cmdclass={ - # Command class instantiated and run during pip install scenarios. - 'build': build, - }, ) From 735691583c2e2d35ef406077b8e03ae0088694a6 Mon Sep 17 00:00:00 2001 From: dataxight Date: Sat, 9 Aug 2025 00:08:35 +0700 Subject: [PATCH 2/2] revert tests --- gcp_variant_transforms/beam_io/vcfio_test.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/gcp_variant_transforms/beam_io/vcfio_test.py b/gcp_variant_transforms/beam_io/vcfio_test.py index 97ac20f88..5e9048ab4 100644 --- a/gcp_variant_transforms/beam_io/vcfio_test.py +++ b/gcp_variant_transforms/beam_io/vcfio_test.py @@ -277,12 +277,11 @@ def _assert_pipeline_read_files_record_count_equal( @unittest.skipIf(VCF_FILE_DIR_MISSING, 'VCF test file directory is missing') def test_read_single_file_large(self): test_data_conifgs = [ - # {'file': 'valid-4.0.vcf', 'num_records': 5}, - # {'file': 'valid-4.0.vcf.gz', 'num_records': 5}, - # {'file': 'valid-4.0.vcf.bz2', 'num_records': 5}, - # {'file': 'valid-4.1-large.vcf', 'num_records': 9882}, - # {'file': 'valid-4.2.vcf', 'num_records': 13}, - {'file': 'tmp8nrk3cg7.vcf', 'num_records': 8} + {'file': 'valid-4.0.vcf', 'num_records': 5}, + {'file': 'valid-4.0.vcf.gz', 'num_records': 5}, + {'file': 'valid-4.0.vcf.bz2', 'num_records': 5}, + {'file': 'valid-4.1-large.vcf', 'num_records': 9882}, + {'file': 'valid-4.2.vcf', 'num_records': 13} ] for config in test_data_conifgs: read_data = self._read_records(