From 8c6638f043a43b1ea33e43d208955eb903ed21a2 Mon Sep 17 00:00:00 2001 From: John Han Date: Tue, 12 Jun 2018 03:54:54 -0700 Subject: [PATCH 1/4] Creates PoC pipeline to estimate disk usage of vcf_to_bq on Dataflow. The pipeline uses raw file size and raw+encoded sizes of a short snippet at beginning of VCF files to estimate the encoded size for a commit. The major blocking bug is that when the snippets are being read from VCFs in an encoded format, lines are being read more than once. --- .../beam_io/vcf_file_size_io.py | 225 ++++++++++++++++++ .../beam_io/vcf_file_size_io_test.py | 0 gcp_variant_transforms/beam_io/vcfio.py | 48 ++-- .../libs/preprocess_reporter.py | 23 +- .../libs/preprocess_reporter_test.py | 39 ++- .../options/variant_transform_options.py | 7 + .../vcf_to_bq_preprocess.py | 32 ++- 7 files changed, 344 insertions(+), 30 deletions(-) create mode 100644 gcp_variant_transforms/beam_io/vcf_file_size_io.py create mode 100644 gcp_variant_transforms/beam_io/vcf_file_size_io_test.py diff --git a/gcp_variant_transforms/beam_io/vcf_file_size_io.py b/gcp_variant_transforms/beam_io/vcf_file_size_io.py new file mode 100644 index 000000000..d6f8f0c1e --- /dev/null +++ b/gcp_variant_transforms/beam_io/vcf_file_size_io.py @@ -0,0 +1,225 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A source for estimating the size of VCF files when processed by vcf_to_bq.""" + +from __future__ import absolute_import + +from typing import Iterable, List, Tuple # pylint: disable=unused-import +import logging + +import apache_beam as beam +from apache_beam import coders +from apache_beam import transforms +from apache_beam.io import filebasedsource +from apache_beam.io import range_trackers # pylint: disable=unused-import +from apache_beam.io import filesystem +from apache_beam.io import filesystems +from apache_beam.io import iobase + +from gcp_variant_transforms.beam_io import vcfio + + +def _get_file_sizes(file_pattern): + # type: (str) -> List[FileSizeInfo] + file_sizes = [] + match_result = filesystems.FileSystems.match([file_pattern])[0] + for file_metadata in match_result.metadata_list: + compression_type = filesystem.CompressionTypes.detect_compression_type( + file_metadata.path) + if compression_type != filesystem.CompressionTypes.UNCOMPRESSED: + logging.error("VCF file %s is compressed; disk requirement estimator " + "will not be accurate.", file_metadata.path) + file_sizes.append((file_metadata.path, file_metadata.size_in_bytes,)) + return file_sizes + + +def _convert_variants_to_bytesize(variant): + # type: (vcfio.Variant) -> int + return coders.registry.get_coder(vcfio.Variant).estimate_size(variant) + + +class FileSizeInfo(object): + def __init__(self, name, raw_file_size, encoded_file_size=None): + # type: (str, int, int) -> None + self.name = name + self.raw_size = raw_file_size + self.encoded_size = encoded_file_size # Optional, useful for SumFn. + + def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size): + # type: (int, int) -> None + """Estimates a VCF file's encoded (byte) size by analyzing sample Variants. + + Given the raw_file_size and measurements of several VCF lines from the file, + estimate how much disk the file will take after expansion due to encoding + lines as `vcfio.Variant` objects. The encoded_sample_size will be set as + `self.encoded`. + + This is a simple ratio problem, solving for encoded_sample_size which is + the only unknown: + encoded_sample_size / raw_sample_size = encoded_file_size / raw_file_size + """ + if raw_sample_size == 0: + # Propagate in-band error state to avoid divide-by-zero. + logging.warning("File %s appears to have no valid Variant lines. File " + "will be ignored for size estimation.", self.name) + self.encoded_size = 0 + self.raw_size = 0 + else: + self.encoded_size = (self.raw_size * encoded_sample_size / + raw_sample_size) + + +class FileSizeInfoSumFn(beam.CombineFn): + """Combiner Function to sum up the size fields of FileSizeInfo objects. + + Unlike VariantsSizeInfoSumFn, the input is a PTable mapping str to + FileSizeInfo, so the input is a tuple with the FileSizeInfos as the second + field. The output strips out the str key which represents the file path. + + Example: [FileSizeInfo(a, b), FileSizeInfo(c, d)] -> FileSizeInfo(a+c, b+d) + """ + def create_accumulator(self): + # type: (None) -> Tuple[int, int] + return (0, 0) # (raw, encoded) sums + + def add_input(self, (raw, encoded), file_size_info): + # type: (Tuple[int, int], FileSizeInfo) -> Tuple[int, int] + return raw + file_size_info.raw_size, encoded + file_size_info.encoded_size + + def merge_accumulators(self, accumulators): + # type: (Iterable[Tuple[int, int]]) -> Tuple[int, int] + raw, encoded = zip(*accumulators) + return sum(raw), sum(encoded) + + def extract_output(self, (raw, encoded)): + # type: (Tuple[int, int]) -> FileSizeInfo + return FileSizeInfo("cumulative", raw, encoded) + + +class _EstimateVcfSizeSource(filebasedsource.FileBasedSource): + """A source for estimating the encoded size of a VCF file in `vcf_to_bq`. + + This source first reads a limited number of variants from a set of VCF files, + then + + Lines that are malformed are skipped. + + Parses VCF files (version 4) using PyVCF library. + """ + + DEFAULT_VCF_READ_BUFFER_SIZE = 65536 # 64kB + + def __init__(self, + file_pattern, + sample_size, + compression_type=filesystem.CompressionTypes.AUTO, + validate=True, + vcf_parser_type=vcfio.VcfParserType.PYVCF): + # type: (str, int, str, bool, vcfio.VcfParserType) -> None + super(_EstimateVcfSizeSource, self).__init__( + file_pattern, + compression_type=compression_type, + validate=validate, + splittable=False) + self._compression_type = compression_type + self._sample_size = sample_size + self._vcf_parser_type = vcf_parser_type + + def read_records( + self, + file_name, # type: str + range_tracker # type: range_trackers.UnsplittableRangeTracker + ): + # type: (...) -> Iterable[Tuple[str, str, vcfio.Variant]] + """This "generator" only emits a single FileSizeInfo object per file.""" + vcf_parser_class = vcfio.get_vcf_parser(self._vcf_parser_type) + record_iterator = vcf_parser_class( + file_name, + range_tracker, + self._pattern, + self._compression_type, + allow_malformed_records=True, + representative_header_lines=None, + buffer_size=self.DEFAULT_VCF_READ_BUFFER_SIZE, + skip_header_lines=0) + + _, raw_file_size = _get_file_sizes(file_name)[0] + + # Open distinct channel to read lines as raw bytestrings. + with filesystems.FileSystems.open(file_name, + self._compression_type) as raw_reader: + raw_record = raw_reader.readline() + while raw_record and raw_record.startswith('#'): + # Skip headers, assume header size is negligible. + raw_record = raw_reader.readline() + + count, raw_size, encoded_size = 0, 0, 0 + for encoded_record in record_iterator: + if count >= self._sample_size: + break + if not isinstance(encoded_record, vcfio.Variant): + logging.error( + "Skipping VCF line that could not be decoded as a " + "`vcfio.Variant` in file %s: %s", file_name, raw_record) + continue + + raw_size += len(raw_record) + encoded_size += _convert_variants_to_bytesize(encoded_record) + count += 1 + + raw_record = raw_reader.readline() # Increment raw iterator. + file_size_info = FileSizeInfo(file_name, raw_file_size) + file_size_info.estimate_encoded_file_size(raw_size, encoded_size) + yield file_size_info + + +class EstimateVcfSize(transforms.PTransform): + """A PTransform for reading a limited number of lines from a set of VCF files. + + Output will be a PTable mapping from `file names -> Tuple[(line, Variant)]` + objects. The list contains the first `sample_size` number of lines that are + not malformed, first as a raw string and then encoded as a `Variant` class. + + Parses VCF files (version 4) using PyVCF library. + """ + + def __init__( + self, + file_pattern, # type: str + sample_size, # type: int + compression_type=filesystem.CompressionTypes.AUTO, # type: str + validate=True, # type: bool + **kwargs # type: **str + ): + # type: (...) -> None + """Initialize the :class:`ReadVcfHeaders` transform. + + Args: + file_pattern: The file path to read from either as a single file or a glob + pattern. + sample_size: The number of lines that should be read from the file. + compression_type: Used to handle compressed input files. + Typical value is :attr:`CompressionTypes.AUTO + `, in which case the + underlying file_path's extension will be used to detect the compression. + validate: Flag to verify that the files exist during the pipeline creation + time. + """ + super(EstimateVcfSize, self).__init__(**kwargs) + self._source = _EstimateVcfSizeSource( + file_pattern, sample_size, compression_type, validate=validate) + + def expand(self, pvalue): + return pvalue.pipeline | iobase.Read(self._source) diff --git a/gcp_variant_transforms/beam_io/vcf_file_size_io_test.py b/gcp_variant_transforms/beam_io/vcf_file_size_io_test.py new file mode 100644 index 000000000..e69de29bb diff --git a/gcp_variant_transforms/beam_io/vcfio.py b/gcp_variant_transforms/beam_io/vcfio.py index 579558255..d633b42d5 100644 --- a/gcp_variant_transforms/beam_io/vcfio.py +++ b/gcp_variant_transforms/beam_io/vcfio.py @@ -19,7 +19,7 @@ from __future__ import absolute_import -from typing import Any, Iterable, List, Tuple # pylint: disable=unused-import +from typing import Any, Iterable, List, Tuple, Type # pylint: disable=unused-import from functools import partial import enum @@ -29,9 +29,9 @@ from apache_beam.io import filesystems from apache_beam.io import range_trackers # pylint: disable=unused-import from apache_beam.io import textio -from apache_beam.io.filesystem import CompressionTypes -from apache_beam.io.iobase import Read -from apache_beam.transforms import PTransform +from apache_beam.io import filesystem +from apache_beam.io import iobase +from apache_beam import transforms from gcp_variant_transforms.beam_io import vcf_parser @@ -55,6 +55,16 @@ class VcfParserType(enum.Enum): PYVCF = 0 NUCLEUS = 1 +def get_vcf_parser(vcf_parser_type): + # type: (VcfParserType) -> Type[vcf_parser.VcfParser] + if vcf_parser_type == VcfParserType.PYVCF: + return vcf_parser.PyVcfParser + elif vcf_parser_type == VcfParserType.NUCLEUS: + return vcf_parser.NucleusParser + else: + raise ValueError( + 'Unrecognized _vcf_parser_type: %s.' % str(vcf_parser_type)) + class _ToVcfRecordCoder(coders.Coder): """Coder for encoding :class:`Variant` objects as VCF text lines.""" @@ -192,7 +202,7 @@ class _VcfSource(filebasedsource.FileBasedSource): def __init__(self, file_pattern, # type: str representative_header_lines=None, # type: List[str] - compression_type=CompressionTypes.AUTO, # type: str + compression_type=filesystem.CompressionTypes.AUTO, # type: str buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE, # type: int validate=True, # type: bool allow_malformed_records=False, # type: bool @@ -213,14 +223,7 @@ def read_records(self, range_tracker # type: range_trackers.OffsetRangeTracker ): # type: (...) -> Iterable[MalformedVcfRecord] - vcf_parser_class = None - if self._vcf_parser_type == VcfParserType.PYVCF: - vcf_parser_class = vcf_parser.PyVcfParser - elif self._vcf_parser_type == VcfParserType.NUCLEUS: - vcf_parser_class = vcf_parser.NucleusParser - else: - raise ValueError( - 'Unrecognized _vcf_parser_type: %s.' % str(self._vcf_parser_type)) + vcf_parser_class = get_vcf_parser(self._vcf_parser_type) record_iterator = vcf_parser_class( file_name, range_tracker, @@ -235,7 +238,8 @@ def read_records(self, for record in record_iterator: yield record -class ReadFromVcf(PTransform): + +class ReadFromVcf(transforms.PTransform): """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading VCF files. @@ -249,7 +253,7 @@ def __init__( self, file_pattern=None, # type: str representative_header_lines=None, # type: List[str] - compression_type=CompressionTypes.AUTO, # type: str + compression_type=filesystem.CompressionTypes.AUTO, # type: str validate=True, # type: bool allow_malformed_records=False, # type: bool vcf_parser_type=VcfParserType.PYVCF, # type: int @@ -280,7 +284,7 @@ def __init__( vcf_parser_type=vcf_parser_type) def expand(self, pvalue): - return pvalue.pipeline | Read(self._source) + return pvalue.pipeline | iobase.Read(self._source) def _create_vcf_source( @@ -292,7 +296,7 @@ def _create_vcf_source( allow_malformed_records=allow_malformed_records) -class ReadAllFromVcf(PTransform): +class ReadAllFromVcf(transforms.PTransform): """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading a :class:`~apache_beam.pvalue.PCollection` of VCF files. @@ -310,7 +314,7 @@ def __init__( self, representative_header_lines=None, # type: List[str] desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, # type: int - compression_type=CompressionTypes.AUTO, # type: str + compression_type=filesystem.CompressionTypes.AUTO, # type: str allow_malformed_records=False, # type: bool **kwargs # type: **str ): @@ -339,7 +343,7 @@ def __init__( allow_malformed_records=allow_malformed_records) self._read_all_files = filebasedsource.ReadAllFiles( True, # splittable - CompressionTypes.AUTO, desired_bundle_size, + filesystem.CompressionTypes.AUTO, desired_bundle_size, 0, # min_bundle_size source_from_file) @@ -347,13 +351,13 @@ def expand(self, pvalue): return pvalue | 'ReadAllFiles' >> self._read_all_files -class WriteToVcf(PTransform): +class WriteToVcf(transforms.PTransform): """A PTransform for writing to VCF files.""" def __init__(self, file_path, num_shards=1, - compression_type=CompressionTypes.AUTO, + compression_type=filesystem.CompressionTypes.AUTO, headers=None): # type: (str, int, str, List[str]) -> None """Initialize a WriteToVcf PTransform. @@ -404,7 +408,7 @@ def process(self, (file_path, variants), *args, **kwargs): file_to_write.write(self._coder.encode(variant)) -class WriteVcfDataLines(PTransform): +class WriteVcfDataLines(transforms.PTransform): """A PTransform for writing VCF data lines. This PTransform takes PCollection<`file_path`, `variants`> as input, and diff --git a/gcp_variant_transforms/libs/preprocess_reporter.py b/gcp_variant_transforms/libs/preprocess_reporter.py index 7870647d4..07ee5d949 100644 --- a/gcp_variant_transforms/libs/preprocess_reporter.py +++ b/gcp_variant_transforms/libs/preprocess_reporter.py @@ -26,6 +26,8 @@ TODO(allieychen): Eventually, it also contains the resource estimation. Output example (assuming opening in spreedsheet): +Estimated disk usage by Dataflow: 4846.0 GB +Total raw file sizes: 1231.0 GB Header Conflicts ID Category Conflicts File Paths Proposed Resolution NS INFO num=1 type=Float file1 num=1 type=Float @@ -43,13 +45,16 @@ File Path Variant Record Error Message file 1 rs6 G A 29 PASS NS=3; invalid literal for int() with base 10. """ +import math from typing import Dict, List, Optional, Union # pylint: disable=unused-import from apache_beam.io import filesystems from gcp_variant_transforms.beam_io import vcfio # pylint: disable=unused-import -from gcp_variant_transforms.beam_io import vcf_header_io +from gcp_variant_transforms.beam_io import vcf_header_io # pylint: disable=unused-import +from gcp_variant_transforms.beam_io import vcf_file_size_io # pylint: disable=unused-import +from gcp_variant_transforms.beam_io.vcf_header_io import VcfParserHeaderKeyConstants from gcp_variant_transforms.libs import vcf_header_definitions_merger # pylint: disable=unused-import # An alias for the header key constants to make referencing easier. @@ -78,6 +83,7 @@ class _HeaderLine(object): def generate_report( header_definitions, # type: _VcfHeaderDefinitions file_path, # type: str + disk_usage_estimate, # type: vcf_file_size_io.FileSizeInfo resolved_headers=None, # type: vcf_header_io.VcfHeader inferred_headers=None, # type: vcf_header_io.VcfHeader malformed_records=None # type: List[vcfio.MalformedVcfRecord] @@ -89,6 +95,9 @@ def generate_report( header_definitions: The container which contains all header definitions and the corresponding file names. file_path: The location where the report is saved. + disk_usage_estimate: `FileSizeInfo` with metadata about the input files' + sizes in both raw and Beam-encoded formats. Can be set to `None` if no + estimate was made. resolved_headers: The `VcfHeader` that provides the resolutions for the fields that have conflicting definitions. inferred_headers: The `VcfHeader` that contains the inferred header @@ -98,6 +107,7 @@ def generate_report( """ resolved_headers = resolved_headers or vcf_header_io.VcfHeader() with filesystems.FileSystems.create(file_path) as file_to_write: + _append_disk_usage_estimate_to_report(file_to_write, disk_usage_estimate) _append_conflicting_headers_to_report(file_to_write, header_definitions, resolved_headers) _append_inferred_headers_to_report(file_to_write, inferred_headers) @@ -276,6 +286,17 @@ def _format_definition(num_value, type_value): return ' '.join(formatted_definition) +def _append_disk_usage_estimate_to_report(file_to_write, disk_usage_estimate): + # type: (file, vcf_file_size_io.FileSizeInfo) -> None + if disk_usage_estimate is None: + return + file_to_write.write( + 'Estimated disk usage by Dataflow: {} GB\n' + 'Total raw file sizes: {} GB\n'.format( + int(math.ceil(disk_usage_estimate.encoded_size / 1e9)), + int(math.ceil(disk_usage_estimate.raw_size / 1e9)))) + + def _append_to_report(file_to_write, error_type, header, contents): # type: (file, str, str, List[str]) -> None """Appends the contents to `file_to_write`. diff --git a/gcp_variant_transforms/libs/preprocess_reporter_test.py b/gcp_variant_transforms/libs/preprocess_reporter_test.py index f113bf52d..4ec58d569 100644 --- a/gcp_variant_transforms/libs/preprocess_reporter_test.py +++ b/gcp_variant_transforms/libs/preprocess_reporter_test.py @@ -18,10 +18,11 @@ from typing import List # pylint: disable=unused-import import unittest -from apache_beam.io.filesystems import FileSystems +from apache_beam.io import filesystems from vcf.parser import _Format as Format from vcf.parser import _Info as Info +from gcp_variant_transforms.beam_io import vcf_file_size_io from gcp_variant_transforms.beam_io import vcfio from gcp_variant_transforms.beam_io.vcf_header_io import VcfHeader from gcp_variant_transforms.libs import preprocess_reporter @@ -39,18 +40,21 @@ def _generate_report_and_assert_contents_equal( header_definitions, # type: VcfHeaderDefinitions resolved_headers=None, # type: VcfHeader inferred_headers=None, # type: VcfHeader - malformed_records=None # type: List[vcfio.MalformedVcfRecord] + malformed_records=None, # type: List[vcfio.MalformedVcfRecord] + disk_usage_estimate=None, # type: vcf_file_size_io.FileSizeInfo ): # type: (...) -> None with temp_dir.TempDir() as tempdir: - file_path = FileSystems.join(tempdir.get_path(), - PreprocessReporterTest._REPORT_NAME) + file_path = filesystems.FileSystems.join( + tempdir.get_path(), + PreprocessReporterTest._REPORT_NAME) preprocess_reporter.generate_report(header_definitions, file_path, + disk_usage_estimate, resolved_headers, inferred_headers, malformed_records) - with FileSystems.open(file_path) as f: + with filesystems.FileSystems.open(file_path) as f: reader = f.readlines() self.assertEqual(reader, expected_content) @@ -69,6 +73,31 @@ def test_report_no_conflicts(self): header_definitions, resolved_headers) + def test_report_with_disk_estimate(self): + header_definitions = merge_header_definitions.VcfHeaderDefinitions() + header_definitions._infos = {'NS': {Definition(1, 'Float'): ['file1']}} + header_definitions._formats = {'NS': {Definition(1, 'Float'): ['file2']}} + + infos = OrderedDict([ + ('NS', Info('NS', 1, 'Integer', 'Number samples', None, None))]) + formats = OrderedDict([('NS', Format('NS', 1, 'Float', 'Number samples'))]) + resolved_headers = VcfHeader(infos=infos, formats=formats) + + file_size_info = vcf_file_size_io.FileSizeInfo( + 'unused_name', + raw_file_size=int(1e10), + encoded_file_size=int(2e10)) + + expected = ['Estimated disk usage by Dataflow: 20 GB\n', + 'Total raw file sizes: 10 GB\n', + 'No Header Conflicts Found.\n', + '\n'] + self._generate_report_and_assert_contents_equal( + expected, + header_definitions, + resolved_headers, + disk_usage_estimate=file_size_info) + def test_report_conflicts(self): header_definitions = VcfHeaderDefinitions() header_definitions._infos = {'NS': {Definition(1, 'Integer'): ['file1'], diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 473ef6348..566fc6d75 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -503,6 +503,13 @@ def add_arguments(self, parser): help=('The full path of the resolved headers. The file will not be' 'generated if unspecified. Otherwise, please provide a local ' 'path if run locally, or a cloud path if run on Dataflow.')) + parser.add_argument( + '--estimate_disk_usage', + type='bool', default=False, nargs='?', const=True, + help=('By default, disk resource usage will not be estimated.' + 'If true, the preprocessor will estimate the maximum disk usage ' + 'consumed at any step in the pipeline, which could lead to ' + 'out-of-disk errors at a shuffle step e.g. MergeVariants.')) def validate(self, parsed_args): _validate_inputs(parsed_args) diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index 05c644ebe..6830ba208 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -r"""Pipeline for preprocessing the VCF files. +"""Pipeline for preprocessing the VCF files. This pipeline is aimed to help the user to easily identify and further import the malformed/incompatible VCF files to BigQuery. It generates two files as the @@ -56,6 +56,7 @@ from apache_beam.options import pipeline_options from gcp_variant_transforms import pipeline_common +from gcp_variant_transforms.beam_io import vcf_file_size_io from gcp_variant_transforms.beam_io import vcfio from gcp_variant_transforms.libs import preprocess_reporter from gcp_variant_transforms.options import variant_transform_options @@ -66,6 +67,8 @@ _COMMAND_LINE_OPTIONS = [variant_transform_options.PreprocessOptions] +# Number of lines from each VCF that should be read when estimating disk usage. +_SNIPPET_READ_SIZE = 50 def _get_inferred_headers(variants, # type: pvalue.PCollection merged_header # type: pvalue.PCollection @@ -73,7 +76,7 @@ def _get_inferred_headers(variants, # type: pvalue.PCollection # type: (...) -> (pvalue.PCollection, pvalue.PCollection) inferred_headers = (variants | 'FilterVariants' >> filter_variants.FilterVariants() - | ' InferHeaderFields' >> + | 'InferHeaderFields' >> infer_headers.InferHeaderFields( pvalue.AsSingleton(merged_header), allow_incompatible_records=True, @@ -87,6 +90,24 @@ def _get_inferred_headers(variants, # type: pvalue.PCollection return inferred_headers, merged_header +# TODO(hanjohn): Add an e2e test +def _estimate_disk_resources(p, input_pattern): + # type: (pvalue.PCollection, str) -> (pvalue.PCollection) + # TODO(hanjohn): Add support for `ReadAll` pattern for inputs with very large + # numbers of files. + result = ( + p + | 'InputFilePattern' >> beam.Create([input_pattern]) + | 'ReadFileSizeAndSampleVariants' >> vcf_file_size_io.EstimateVcfSize( + input_pattern, _SNIPPET_READ_SIZE) + | 'SumFileSizeEstimates' >> beam.CombineGlobally( + vcf_file_size_io.FileSizeInfoSumFn())) + result | ('PrintEstimate' >> # pylint: disable=expression-not-assigned + beam.Map(lambda x: logging.info( + "Final estimate of encoded size: %d GB", x.encoded_size / 1e9))) + return result + + def run(argv=None): # type: (List[str]) -> (str, str) """Runs preprocess pipeline.""" @@ -103,6 +124,11 @@ def run(argv=None): merged_definitions = (headers | 'MergeDefinitions' >> merge_header_definitions.MergeDefinitions()) + + disk_usage_estimate = None + if known_args.estimate_disk_usage: + disk_usage_estimate = beam.pvalue.AsSingleton( + _estimate_disk_resources(p, known_args.input_pattern)) if known_args.report_all_conflicts: if len(all_patterns) == 1: variants = p | 'ReadFromVcf' >> vcfio.ReadFromVcf( @@ -120,6 +146,7 @@ def run(argv=None): | 'GenerateConflictsReport' >> beam.ParDo(preprocess_reporter.generate_report, known_args.report_path, + disk_usage_estimate, beam.pvalue.AsSingleton(merged_headers), beam.pvalue.AsSingleton(inferred_headers), beam.pvalue.AsIter(malformed_records))) @@ -128,6 +155,7 @@ def run(argv=None): | 'GenerateConflictsReport' >> beam.ParDo(preprocess_reporter.generate_report, known_args.report_path, + disk_usage_estimate, beam.pvalue.AsSingleton(merged_headers))) if known_args.resolved_headers_path: From 4ccd72d51d70de2cce18fa196d132978d4158946 Mon Sep 17 00:00:00 2001 From: John Han Date: Sun, 10 Feb 2019 22:49:25 -0800 Subject: [PATCH 2/4] Clean up disk_estimator code and style per PR comments. --- .../beam_io/vcf_file_size_io.py | 62 +++++++++++-------- .../libs/preprocess_reporter.py | 4 +- .../libs/preprocess_reporter_test.py | 3 +- .../vcf_to_bq_preprocess.py | 6 +- 4 files changed, 42 insertions(+), 33 deletions(-) diff --git a/gcp_variant_transforms/beam_io/vcf_file_size_io.py b/gcp_variant_transforms/beam_io/vcf_file_size_io.py index d6f8f0c1e..a26e58391 100644 --- a/gcp_variant_transforms/beam_io/vcf_file_size_io.py +++ b/gcp_variant_transforms/beam_io/vcf_file_size_io.py @@ -17,6 +17,7 @@ from __future__ import absolute_import from typing import Iterable, List, Tuple # pylint: disable=unused-import +import itertools import logging import apache_beam as beam @@ -31,18 +32,21 @@ from gcp_variant_transforms.beam_io import vcfio -def _get_file_sizes(file_pattern): +def _get_file_size(file_name): # type: (str) -> List[FileSizeInfo] - file_sizes = [] - match_result = filesystems.FileSystems.match([file_pattern])[0] - for file_metadata in match_result.metadata_list: - compression_type = filesystem.CompressionTypes.detect_compression_type( - file_metadata.path) - if compression_type != filesystem.CompressionTypes.UNCOMPRESSED: - logging.error("VCF file %s is compressed; disk requirement estimator " - "will not be accurate.", file_metadata.path) - file_sizes.append((file_metadata.path, file_metadata.size_in_bytes,)) - return file_sizes + match_result = filesystems.FileSystems.match([file_name])[0] + if len(match_result.metadata_list) != 1: + raise IOError("File name {} did not correspond to exactly 1 result. " + "Instead, got {}.".format(file_name, + len(match_result.metadata_list))) + file_metadata = match_result.metadata_list[0] + + compression_type = filesystem.CompressionTypes.detect_compression_type( + file_metadata.path) + if compression_type != filesystem.CompressionTypes.UNCOMPRESSED: + logging.error("VCF file %s is compressed; disk requirement estimator " + "will not be accurate.", file_metadata.path) + return file_metadata.size_in_bytes def _convert_variants_to_bytesize(variant): @@ -64,7 +68,7 @@ def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size): Given the raw_file_size and measurements of several VCF lines from the file, estimate how much disk the file will take after expansion due to encoding lines as `vcfio.Variant` objects. The encoded_sample_size will be set as - `self.encoded`. + `self.encoded_size`. This is a simple ratio problem, solving for encoded_sample_size which is the only unknown: @@ -111,8 +115,11 @@ def extract_output(self, (raw, encoded)): class _EstimateVcfSizeSource(filebasedsource.FileBasedSource): """A source for estimating the encoded size of a VCF file in `vcf_to_bq`. - This source first reads a limited number of variants from a set of VCF files, - then + This source first obtains the raw file sizes of a set of VCF files. Then, + the source reads a limited number of variants from a set of VCF files, + both as raw strings and encoded `Variant` objects. Finally, the reader + returns a single `FileSizeInfo` object with an estimate of the input size + if all sizes had been encoded as `Variant` objects. Lines that are malformed are skipped. @@ -142,7 +149,7 @@ def read_records( file_name, # type: str range_tracker # type: range_trackers.UnsplittableRangeTracker ): - # type: (...) -> Iterable[Tuple[str, str, vcfio.Variant]] + # type: (...) -> Iterable[FileSizeInfo] """This "generator" only emits a single FileSizeInfo object per file.""" vcf_parser_class = vcfio.get_vcf_parser(self._vcf_parser_type) record_iterator = vcf_parser_class( @@ -155,18 +162,20 @@ def read_records( buffer_size=self.DEFAULT_VCF_READ_BUFFER_SIZE, skip_header_lines=0) - _, raw_file_size = _get_file_sizes(file_name)[0] + raw_file_size = _get_file_size(file_name) # Open distinct channel to read lines as raw bytestrings. with filesystems.FileSystems.open(file_name, - self._compression_type) as raw_reader: - raw_record = raw_reader.readline() - while raw_record and raw_record.startswith('#'): - # Skip headers, assume header size is negligible. - raw_record = raw_reader.readline() - + self._compression_type) as raw_iterator: count, raw_size, encoded_size = 0, 0, 0 - for encoded_record in record_iterator: + for encoded_record, raw_record in itertools.izip(record_iterator, + raw_iterator): + while raw_record and raw_record.startswith('#'): + # Skip headers. Assume that header size is negligible. + raw_record = raw_iterator.next() + logging.debug( + "Reading record for disk usage estimation. Encoded variant: %s\n" + "Raw variant: %s", encoded_record, raw_record) if count >= self._sample_size: break if not isinstance(encoded_record, vcfio.Variant): @@ -174,12 +183,13 @@ def read_records( "Skipping VCF line that could not be decoded as a " "`vcfio.Variant` in file %s: %s", file_name, raw_record) continue - - raw_size += len(raw_record) + # Encoding in `utf-8` should represent the string as one byte per char, + # even for non-ASCII chars. Python adds significant overhead to the + # bytesize of the full str object. + raw_size += len(raw_record.encode('utf-8')) encoded_size += _convert_variants_to_bytesize(encoded_record) count += 1 - raw_record = raw_reader.readline() # Increment raw iterator. file_size_info = FileSizeInfo(file_name, raw_file_size) file_size_info.estimate_encoded_file_size(raw_size, encoded_size) yield file_size_info diff --git a/gcp_variant_transforms/libs/preprocess_reporter.py b/gcp_variant_transforms/libs/preprocess_reporter.py index 07ee5d949..94bf2036d 100644 --- a/gcp_variant_transforms/libs/preprocess_reporter.py +++ b/gcp_variant_transforms/libs/preprocess_reporter.py @@ -45,6 +45,7 @@ File Path Variant Record Error Message file 1 rs6 G A 29 PASS NS=3; invalid literal for int() with base 10. """ +import logging import math from typing import Dict, List, Optional, Union # pylint: disable=unused-import @@ -54,7 +55,6 @@ from gcp_variant_transforms.beam_io import vcfio # pylint: disable=unused-import from gcp_variant_transforms.beam_io import vcf_header_io # pylint: disable=unused-import from gcp_variant_transforms.beam_io import vcf_file_size_io # pylint: disable=unused-import -from gcp_variant_transforms.beam_io.vcf_header_io import VcfParserHeaderKeyConstants from gcp_variant_transforms.libs import vcf_header_definitions_merger # pylint: disable=unused-import # An alias for the header key constants to make referencing easier. @@ -290,6 +290,8 @@ def _append_disk_usage_estimate_to_report(file_to_write, disk_usage_estimate): # type: (file, vcf_file_size_io.FileSizeInfo) -> None if disk_usage_estimate is None: return + logging.info("Final estimate of encoded size: %d GB", + disk_usage_estimate.encoded_size / 1e9) file_to_write.write( 'Estimated disk usage by Dataflow: {} GB\n' 'Total raw file sizes: {} GB\n'.format( diff --git a/gcp_variant_transforms/libs/preprocess_reporter_test.py b/gcp_variant_transforms/libs/preprocess_reporter_test.py index 4ec58d569..202c47dd1 100644 --- a/gcp_variant_transforms/libs/preprocess_reporter_test.py +++ b/gcp_variant_transforms/libs/preprocess_reporter_test.py @@ -74,7 +74,7 @@ def test_report_no_conflicts(self): resolved_headers) def test_report_with_disk_estimate(self): - header_definitions = merge_header_definitions.VcfHeaderDefinitions() + header_definitions = VcfHeaderDefinitions() header_definitions._infos = {'NS': {Definition(1, 'Float'): ['file1']}} header_definitions._formats = {'NS': {Definition(1, 'Float'): ['file2']}} @@ -82,7 +82,6 @@ def test_report_with_disk_estimate(self): ('NS', Info('NS', 1, 'Integer', 'Number samples', None, None))]) formats = OrderedDict([('NS', Format('NS', 1, 'Float', 'Number samples'))]) resolved_headers = VcfHeader(infos=infos, formats=formats) - file_size_info = vcf_file_size_io.FileSizeInfo( 'unused_name', raw_file_size=int(1e10), diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index 6830ba208..015eadee2 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Pipeline for preprocessing the VCF files. +r"""Pipeline for preprocessing the VCF files. This pipeline is aimed to help the user to easily identify and further import the malformed/incompatible VCF files to BigQuery. It generates two files as the @@ -68,6 +68,7 @@ _COMMAND_LINE_OPTIONS = [variant_transform_options.PreprocessOptions] # Number of lines from each VCF that should be read when estimating disk usage. +# TODO(hanjohn): Convert this field to a flag. _SNIPPET_READ_SIZE = 50 def _get_inferred_headers(variants, # type: pvalue.PCollection @@ -102,9 +103,6 @@ def _estimate_disk_resources(p, input_pattern): input_pattern, _SNIPPET_READ_SIZE) | 'SumFileSizeEstimates' >> beam.CombineGlobally( vcf_file_size_io.FileSizeInfoSumFn())) - result | ('PrintEstimate' >> # pylint: disable=expression-not-assigned - beam.Map(lambda x: logging.info( - "Final estimate of encoded size: %d GB", x.encoded_size / 1e9))) return result From 68c8dada2c0ebf9e7d9faf61780aacae0d566440 Mon Sep 17 00:00:00 2001 From: John Han Date: Mon, 13 May 2019 02:53:11 -0700 Subject: [PATCH 3/4] Make final fixes to PR --- gcp_variant_transforms/beam_io/vcf_file_size_io.py | 14 +++++++------- gcp_variant_transforms/beam_io/vcfio.py | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/gcp_variant_transforms/beam_io/vcf_file_size_io.py b/gcp_variant_transforms/beam_io/vcf_file_size_io.py index a26e58391..826ad65d3 100644 --- a/gcp_variant_transforms/beam_io/vcf_file_size_io.py +++ b/gcp_variant_transforms/beam_io/vcf_file_size_io.py @@ -17,29 +17,29 @@ from __future__ import absolute_import from typing import Iterable, List, Tuple # pylint: disable=unused-import -import itertools import logging +import itertools import apache_beam as beam from apache_beam import coders from apache_beam import transforms from apache_beam.io import filebasedsource -from apache_beam.io import range_trackers # pylint: disable=unused-import from apache_beam.io import filesystem from apache_beam.io import filesystems from apache_beam.io import iobase +from apache_beam.io import range_trackers # pylint: disable=unused-import from gcp_variant_transforms.beam_io import vcfio def _get_file_size(file_name): # type: (str) -> List[FileSizeInfo] - match_result = filesystems.FileSystems.match([file_name])[0] - if len(match_result.metadata_list) != 1: + matched_files = filesystems.FileSystems.match([file_name])[0].metadata_list + if len(matched_files) != 1: raise IOError("File name {} did not correspond to exactly 1 result. " - "Instead, got {}.".format(file_name, - len(match_result.metadata_list))) - file_metadata = match_result.metadata_list[0] + "Instead, got {} matches.".format(file_name, + len(matched_files))) + file_metadata = matched_files[0] compression_type = filesystem.CompressionTypes.detect_compression_type( file_metadata.path) diff --git a/gcp_variant_transforms/beam_io/vcfio.py b/gcp_variant_transforms/beam_io/vcfio.py index d633b42d5..04b9eb729 100644 --- a/gcp_variant_transforms/beam_io/vcfio.py +++ b/gcp_variant_transforms/beam_io/vcfio.py @@ -24,14 +24,14 @@ import enum import apache_beam as beam +from apache_beam import transforms from apache_beam.coders import coders from apache_beam.io import filebasedsource +from apache_beam.io import filesystem from apache_beam.io import filesystems +from apache_beam.io import iobase from apache_beam.io import range_trackers # pylint: disable=unused-import from apache_beam.io import textio -from apache_beam.io import filesystem -from apache_beam.io import iobase -from apache_beam import transforms from gcp_variant_transforms.beam_io import vcf_parser From 18c9807f8114e3267d141a51817d68a298901f5d Mon Sep 17 00:00:00 2001 From: John Han Date: Tue, 3 Sep 2019 16:49:22 -0700 Subject: [PATCH 4/4] Make fixes per tneymanov@ comments --- .../beam_io/vcf_file_size_io.py | 58 +++++++------------ .../libs/preprocess_reporter_test.py | 5 +- .../vcf_to_bq_preprocess.py | 25 ++------ 3 files changed, 27 insertions(+), 61 deletions(-) diff --git a/gcp_variant_transforms/beam_io/vcf_file_size_io.py b/gcp_variant_transforms/beam_io/vcf_file_size_io.py index 0222d2f97..ef093dcd2 100644 --- a/gcp_variant_transforms/beam_io/vcf_file_size_io.py +++ b/gcp_variant_transforms/beam_io/vcf_file_size_io.py @@ -32,6 +32,9 @@ from gcp_variant_transforms.beam_io import vcfio +# Number of lines from each VCF that should be read when estimating disk usage. +SNIPPET_READ_SIZE = 50 + def _get_file_size(file_name): # type: (str) -> List[FileSizeInfo] matched_files = filesystems.FileSystems.match([file_name])[0].metadata_list @@ -49,30 +52,17 @@ def _get_file_size(file_name): return file_metadata.size_in_bytes -def _convert_variants_to_bytesize(variant): - # type: (vcfio.Variant) -> int - return coders.registry.get_coder(vcfio.Variant).estimate_size(variant) - - class FileSizeInfo(object): - def __init__(self, name, raw_file_size, encoded_file_size=None): - # type: (str, int, int) -> None + def __init__(self, raw_size, encoded_size=None, name="[no filename]"): + # type: (int, int, str) -> None + self.raw_size = raw_size + self.encoded_size = encoded_size # Allow direct initialization self.name = name - self.raw_size = raw_file_size - self.encoded_size = encoded_file_size # Optional, useful for SumFn. def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size): # type: (int, int) -> None - """Estimates a VCF file's encoded (byte) size by analyzing sample Variants. - - Given the raw_file_size and measurements of several VCF lines from the file, - estimate how much disk the file will take after expansion due to encoding - lines as `vcfio.Variant` objects. The encoded_sample_size will be set as - `self.encoded_size`. - - This is a simple ratio problem, solving for encoded_sample_size which is - the only unknown: - encoded_sample_size / raw_sample_size = encoded_file_size / raw_file_size + """Estimate encoded file size, given the sizes for the raw file, sample raw + lines and sample encoded lines. """ if raw_sample_size == 0: # Propagate in-band error state to avoid divide-by-zero. @@ -86,13 +76,7 @@ def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size): class FileSizeInfoSumFn(beam.CombineFn): - """Combiner Function to sum up the size fields of FileSizeInfo objects. - - Unlike VariantsSizeInfoSumFn, the input is a PTable mapping str to - FileSizeInfo, so the input is a tuple with the FileSizeInfos as the second - field. The output strips out the str key which represents the file path. - - Example: [FileSizeInfo(a, b), FileSizeInfo(c, d)] -> FileSizeInfo(a+c, b+d) + """Combiner Function, used to sum up the size fields of FileSizeInfo objects. """ def create_accumulator(self): # type: (None) -> Tuple[int, int] @@ -109,7 +93,7 @@ def merge_accumulators(self, accumulators): def extract_output(self, (raw, encoded)): # type: (Tuple[int, int]) -> FileSizeInfo - return FileSizeInfo("cumulative", raw, encoded) + return FileSizeInfo(raw, encoded) class _EstimateVcfSizeSource(filebasedsource.FileBasedSource): @@ -122,8 +106,6 @@ class _EstimateVcfSizeSource(filebasedsource.FileBasedSource): if all sizes had been encoded as `Variant` objects. Lines that are malformed are skipped. - - Parses VCF files (version 4) using PyVCF library. """ DEFAULT_VCF_READ_BUFFER_SIZE = 65536 # 64kB @@ -187,22 +169,20 @@ def read_records( # even for non-ASCII chars. Python adds significant overhead to the # bytesize of the full str object. raw_size += len(raw_record.encode('utf-8')) - encoded_size += _convert_variants_to_bytesize(encoded_record) + encoded_size += coders.registry.get_coder(vcfio.Variant).estimate_size( + encoded_record) count += 1 - file_size_info = FileSizeInfo(file_name, raw_file_size) + file_size_info = FileSizeInfo(raw_file_size, name=file_name) file_size_info.estimate_encoded_file_size(raw_size, encoded_size) yield file_size_info class EstimateVcfSize(transforms.PTransform): - """A PTransform for reading a limited number of lines from a set of VCF files. - - Output will be a PTable mapping from `file names -> Tuple[(line, Variant)]` - objects. The list contains the first `sample_size` number of lines that are - not malformed, first as a raw string and then encoded as a `Variant` class. + """PTransform estimating encoded size of VCFs without reading whole files. - Parses VCF files (version 4) using PyVCF library. + Output is a PCollection with a single FileSizeInfo object representing the + aggregate encoded size estimate. """ def __init__( @@ -232,4 +212,6 @@ def __init__( file_pattern, sample_size, compression_type, validate=validate) def expand(self, pvalue): - return pvalue.pipeline | iobase.Read(self._source) + return (pvalue.pipeline + | iobase.Read(self._source) + | beam.CombineGlobally(FileSizeInfoSumFn())) diff --git a/gcp_variant_transforms/libs/preprocess_reporter_test.py b/gcp_variant_transforms/libs/preprocess_reporter_test.py index 202c47dd1..7ba52d6c0 100644 --- a/gcp_variant_transforms/libs/preprocess_reporter_test.py +++ b/gcp_variant_transforms/libs/preprocess_reporter_test.py @@ -83,9 +83,8 @@ def test_report_with_disk_estimate(self): formats = OrderedDict([('NS', Format('NS', 1, 'Float', 'Number samples'))]) resolved_headers = VcfHeader(infos=infos, formats=formats) file_size_info = vcf_file_size_io.FileSizeInfo( - 'unused_name', - raw_file_size=int(1e10), - encoded_file_size=int(2e10)) + raw_size=int(1e10), + encoded_size=int(2e10)) expected = ['Estimated disk usage by Dataflow: 20 GB\n', 'Total raw file sizes: 10 GB\n', diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index a6fe55b1a..39332fd76 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -66,10 +66,6 @@ _COMMAND_LINE_OPTIONS = [variant_transform_options.PreprocessOptions] -# Number of lines from each VCF that should be read when estimating disk usage. -# TODO(hanjohn): Convert this field to a flag. -_SNIPPET_READ_SIZE = 50 - def _get_inferred_headers(variants, # type: pvalue.PCollection merged_header # type: pvalue.PCollection ): @@ -90,21 +86,6 @@ def _get_inferred_headers(variants, # type: pvalue.PCollection return inferred_headers, merged_header -# TODO(hanjohn): Add an e2e test -def _estimate_disk_resources(p, input_pattern): - # type: (pvalue.PCollection, str) -> (pvalue.PCollection) - # TODO(hanjohn): Add support for `ReadAll` pattern for inputs with very large - # numbers of files. - result = ( - p - | 'InputFilePattern' >> beam.Create([input_pattern]) - | 'ReadFileSizeAndSampleVariants' >> vcf_file_size_io.EstimateVcfSize( - input_pattern, _SNIPPET_READ_SIZE) - | 'SumFileSizeEstimates' >> beam.CombineGlobally( - vcf_file_size_io.FileSizeInfoSumFn())) - return result - - def run(argv=None): # type: (List[str]) -> (str, str) """Runs preprocess pipeline.""" @@ -124,8 +105,12 @@ def run(argv=None): disk_usage_estimate = None if known_args.estimate_disk_usage: + # TODO(hanjohn): Add an e2e test + # TODO(hanjohn): Add support for `ReadAll` pattern for inputs with very + # large numbers of files. disk_usage_estimate = beam.pvalue.AsSingleton( - _estimate_disk_resources(p, known_args.input_pattern)) + p | 'SampleAndEstimateFileSize' >> vcf_file_size_io.EstimateVcfSize( + known_args.input_pattern, vcf_file_size_io.SNIPPET_READ_SIZE)) if known_args.report_all_conflicts: variants = pipeline_common.read_variants(p, all_patterns,