Skip to content

Commit 5f38e93

Browse files
committed
Address first iteration of comments.
1 parent 666fa4a commit 5f38e93

File tree

4 files changed

+21
-33
lines changed

4 files changed

+21
-33
lines changed

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/valid_4_1_pysam.json

Lines changed: 0 additions & 23 deletions
This file was deleted.

gcp_variant_transforms/transforms/sample_info_to_bigquery.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,15 @@
2424
class ConvertSampleInfoToRow(beam.DoFn):
2525
"""Extracts sample info from `VcfHeader` and converts it to a BigQuery row."""
2626

27-
def process(self, vcf_header, samples_span_multiple_files):
27+
def __init__(self,
28+
samples_span_multiple_files=False, # type: bool
29+
):
30+
self._samples_span_multiple_files = samples_span_multiple_files
31+
32+
def process(self, vcf_header):
2833
# type: (vcf_header_io.VcfHeader, bool) -> Dict[str, Union[int, str]]
2934
for sample in vcf_header.samples:
30-
if samples_span_multiple_files:
35+
if self._samples_span_multiple_files:
3136
sample_id = hashing_util.generate_unsigned_hash_code(
3237
[sample], max_hash_value=pow(2, 63))
3338
else:
@@ -45,7 +50,7 @@ def process(self, vcf_header, samples_span_multiple_files):
4550
class SampleInfoToBigQuery(beam.PTransform):
4651
"""Writes sample info to BigQuery."""
4752

48-
def __init__(self, output_table_prefix, append=False,
53+
def __init__(self, output_table_prefix, temp_location, append=False,
4954
samples_span_multiple_files=False):
5055
# type: (str, Dict[str, str], bool, bool) -> None
5156
"""Initializes the transform.
@@ -60,7 +65,7 @@ def __init__(self, output_table_prefix, append=False,
6065
self._output_table = sample_info_table_schema_generator.compose_table_name(
6166
output_table_prefix, sample_info_table_schema_generator.TABLE_SUFFIX)
6267
self._append = append
63-
self.samples_span_multiple_files = samples_span_multiple_files
68+
self._samples_span_multiple_files = samples_span_multiple_files
6469
self._schema = sample_info_table_schema_generator.generate_schema()
6570
self._temp_location = temp_location
6671

gcp_variant_transforms/vcf_to_bq.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ def _run_annotation_pipeline(known_args, pipeline_args):
384384
def _create_sample_info_table(pipeline, # type: beam.Pipeline
385385
pipeline_mode, # type: PipelineModes
386386
known_args, # type: argparse.Namespace,
387-
pipeline_args, # type: List[str]
387+
temp_directory, # str
388388
):
389389
# type: (...) -> None
390390
headers = pipeline_common.read_headers(
@@ -395,6 +395,7 @@ def _create_sample_info_table(pipeline, # type: beam.Pipeline
395395
_ = (headers | 'SampleInfoToBigQuery' >>
396396
sample_info_to_bigquery.SampleInfoToBigQuery(
397397
known_args.output_table,
398+
temp_directory,
398399
known_args.append,
399400
known_args.samples_span_multiple_files))
400401

@@ -405,6 +406,8 @@ def run(argv=None):
405406
logging.info('Command: %s', ' '.join(argv or sys.argv))
406407
known_args, pipeline_args = pipeline_common.parse_args(argv,
407408
_COMMAND_LINE_OPTIONS)
409+
if known_args.output_table and '--temp_location' not in pipeline_args:
410+
raise ValueError('--temp_location is required for BigQuery imports.')
408411
if known_args.auto_flags_experiment:
409412
_get_input_dimensions(known_args, pipeline_args)
410413

@@ -480,8 +483,10 @@ def run(argv=None):
480483
num_partitions = 1
481484

482485
if known_args.output_table:
483-
options = pipeline_options.PipelineOptions(pipeline_args)
484-
google_cloud_options = options.view_as(pipeline_options.GoogleCloudOptions)
486+
temp_directory = pipeline_options.PipelineOptions(pipeline_args).view_as(
487+
pipeline_options.GoogleCloudOptions).temp_location
488+
if not temp_directory:
489+
raise ValueError('--temp_location must be set when writing to BigQuery.')
485490
for i in range(num_partitions):
486491
table_suffix = ''
487492
if partitioner and partitioner.get_partition_name(i):
@@ -491,7 +496,7 @@ def run(argv=None):
491496
variant_to_bigquery.VariantToBigQuery(
492497
table_name,
493498
header_fields,
494-
google_cloud_options.temp_location,
499+
temp_directory,
495500
variant_merger,
496501
processed_variant_factory,
497502
append=known_args.append,
@@ -502,7 +507,7 @@ def run(argv=None):
502507
known_args.null_numeric_value_replacement)))
503508
if known_args.generate_sample_info_table:
504509
_create_sample_info_table(
505-
pipeline, pipeline_mode, known_args, pipeline_args)
510+
pipeline, pipeline_mode, known_args, temp_directory)
506511

507512
if known_args.output_avro_path:
508513
# TODO(bashir2): Add an integration test that outputs to Avro files and

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
# Nucleus needs uptodate protocol buffer compiler (protoc).
4343
'protobuf>=3.6.1',
4444
'mmh3<2.6',
45-
'google-cloud-storage',
45+
# Refer to issue #528
46+
'google-cloud-storage<1.23.0',
4647
'pyfarmhash',
4748
'pyyaml'
4849
]

0 commit comments

Comments
 (0)