Skip to content

Commit 482151b

Browse files
committed
Use new BigQuery sink.
1 parent c5e1e0f commit 482151b

File tree

11 files changed

+30
-79
lines changed

11 files changed

+30
-79
lines changed

cloudbuild_CI.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ steps:
4141
- '--skip_build'
4242
- '--project ${PROJECT_ID}'
4343
- '--image_tag ${COMMIT_SHA}'
44-
- '--run_unit_tests'
45-
- '--run_preprocessor_tests'
46-
- '--run_bq_to_vcf_tests'
4744
- '--run_all_tests'
4845
- '--test_name_prefix cloud-ci-'
4946
id: 'test-gcp-variant-transforms-docker'

docs/large_inputs.md

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ Default settings:
1414
--worker_machine_type <default n1-standard-1> \
1515
--disk_size_gb <default 250> \
1616
--worker_disk_type <default PD> \
17-
--num_bigquery_write_shards <default 1> \
1817
--partition_config_path <default None> \
1918
```
2019

@@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery
9897
output rather than just being specified once as in the VCF header), you
9998
typically need 3 to 4 times the total size of the raw VCF files.
10099

101-
In addition, if [merging](variant_merging.md) or
102-
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may
100+
In addition, if [merging](variant_merging.md) is enabled, you may
103101
need more disk per worker (e.g. 500GB) as the same variants need to be
104102
aggregated together on one machine.
105103

@@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`),
110108
they can reduce cost as they can avoid idle CPU cycles due to disk IOPS
111109
limitations.
112110

113-
As a result, we recommend using SSDs if [merging](variant_merge.md) or
114-
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these
115-
operations require "shuffling" the data (i.e. redistributing the data among
116-
workers), which require significant disk I/O.
111+
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
112+
this operation requires "shuffling" the data (i.e. redistributing the data
113+
among workers), which requires significant disk I/O.
117114

118115
Set
119116
`--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd`
120117
to use SSDs.
121118

122-
### `--num_bigquery_write_shards`
123-
124-
Currently, the write operation to BigQuery in Dataflow is performed as a
125-
postprocessing step after the main transforms are done. As a workaround for
126-
BigQuery write limitations (more details
127-
[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)),
128-
we have added "sharding" when writing to BigQuery. This makes the data load
129-
to BigQuery significantly faster as it parallelizes the process and enables
130-
loading large (>5TB) data to BigQuery at once.
131-
132-
As a result, we recommend setting `--num_bigquery_write_shards 20` when loading
133-
any data that has more than 1 billion rows (after merging) or 1TB of final
134-
output. You may use a smaller number of write shards (e.g. 5) when using
135-
[partitioned output](#--partition_config_path) as each partition also acts as a
136-
"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to
137-
fail as there is a maximum limit on the number of concurrent writes per table.
138-
139119
### `--partition_config_path`
140120

141121
Partitioning the output can save significant query costs once the data is in
@@ -146,4 +126,3 @@ partition).
146126
As a result, we recommend setting the partition config for very large data
147127
where possible. Please see the [documentation](partitioning.md) for more
148128
details.
149-

gcp_variant_transforms/options/variant_transform_options.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,7 @@ def add_arguments(self, parser):
195195
parser.add_argument(
196196
'--num_bigquery_write_shards',
197197
type=int, default=1,
198-
help=('Before writing the final result to output BigQuery, the data is '
199-
'sharded to avoid a known failure for very large inputs (issue '
200-
'#199). Setting this flag to 1 will avoid this extra sharding.'
201-
'It is recommended to use 20 for loading large inputs without '
202-
'merging. Use a smaller value (2 or 3) if both merging and '
203-
'optimize_for_large_inputs are enabled.'))
198+
help=('This flag is deprecated and may be removed in future releases.'))
204199
parser.add_argument(
205200
'--null_numeric_value_replacement',
206201
type=int,

gcp_variant_transforms/pipeline_common.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ def parse_args(argv, command_line_options):
7575
if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'):
7676
known_args.all_patterns = _get_all_patterns(
7777
known_args.input_pattern, known_args.input_file)
78+
79+
# Enable new BQ sink experiment.
80+
pipeline_args += ['--experiment', 'use_beam_bq_sink']
7881
return known_args, pipeline_args
7982

8083

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"worker_machine_type": "n1-standard-64",
1010
"max_num_workers": "64",
1111
"num_workers": "20",
12-
"num_bigquery_write_shards": "20",
1312
"assertion_configs": [
1413
{
1514
"query": ["NUM_ROWS_QUERY"],

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"worker_machine_type": "n1-standard-16",
1010
"max_num_workers": "20",
1111
"num_workers": "20",
12-
"num_bigquery_write_shards": "2",
1312
"assertion_configs": [
1413
{
1514
"query": ["NUM_ROWS_QUERY"],
@@ -68,4 +67,3 @@
6867
]
6968
}
7069
]
71-

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
"worker_machine_type": "n1-standard-16",
88
"max_num_workers": "20",
99
"num_workers": "20",
10-
"num_bigquery_write_shards": "20",
1110
"assertion_configs": [
1211
{
1312
"query": ["NUM_ROWS_QUERY"],

gcp_variant_transforms/transforms/sample_info_to_bigquery.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def __init__(self, output_table_prefix, append=False,
6262
self._append = append
6363
self.samples_span_multiple_files = samples_span_multiple_files
6464
self._schema = sample_info_table_schema_generator.generate_schema()
65+
self._temp_location = temp_location
6566

6667
def expand(self, pcoll):
6768
return (pcoll
@@ -76,4 +77,5 @@ def expand(self, pcoll):
7677
beam.io.BigQueryDisposition.WRITE_APPEND
7778
if self._append
7879
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
79-
method=beam.io.WriteToBigQuery.Method.FILE_LOADS))
80+
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
81+
custom_gcs_temp_location=self._temp_location))

gcp_variant_transforms/transforms/variant_to_bigquery.py

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
from __future__ import absolute_import
1818

19-
import random
2019
from typing import Dict, List # pylint: disable=unused-import
2120

2221
import apache_beam as beam
@@ -29,7 +28,6 @@
2928
from gcp_variant_transforms.libs import processed_variant
3029
from gcp_variant_transforms.libs import vcf_field_conflict_resolver
3130
from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import
32-
from gcp_variant_transforms.transforms import limit_write
3331

3432

3533
# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK,
@@ -67,6 +65,7 @@ def __init__(
6765
self,
6866
output_table, # type: str
6967
header_fields, # type: vcf_header_io.VcfHeader
68+
temp_location, # type: str
7069
variant_merger=None, # type: variant_merge_strategy.VariantMergeStrategy
7170
proc_var_factory=None, # type: processed_variant.ProcessedVariantFactory
7271
# TODO(bashir2): proc_var_factory is a required argument and if `None` is
@@ -75,8 +74,8 @@ def __init__(
7574
update_schema_on_append=False, # type: bool
7675
allow_incompatible_records=False, # type: bool
7776
omit_empty_sample_calls=False, # type: bool
78-
num_bigquery_write_shards=1, # type: int
79-
null_numeric_value_replacement=None # type: int
77+
null_numeric_value_replacement=None # type: int,
78+
8079
):
8180
# type: (...) -> None
8281
"""Initializes the transform.
@@ -99,15 +98,14 @@ def __init__(
9998
+ schema if there is a mismatch.
10099
omit_empty_sample_calls: If true, samples that don't have a given call
101100
will be omitted.
102-
num_bigquery_write_shards: If > 1, we will limit number of sources which
103-
are used for writing to the output BigQuery table.
104101
null_numeric_value_replacement: the value to use instead of null for
105102
numeric (float/int/long) lists. For instance, [0, None, 1] will become
106103
[0, `null_numeric_value_replacement`, 1]. If not set, the value will set
107104
to bigquery_util._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT.
108105
"""
109106
self._output_table = output_table
110107
self._header_fields = header_fields
108+
self._temp_location = temp_location
111109
self._variant_merger = variant_merger
112110
self._proc_var_factory = proc_var_factory
113111
self._append = append
@@ -125,7 +123,6 @@ def __init__(
125123

126124
self._allow_incompatible_records = allow_incompatible_records
127125
self._omit_empty_sample_calls = omit_empty_sample_calls
128-
self._num_bigquery_write_shards = num_bigquery_write_shards
129126
if update_schema_on_append:
130127
bigquery_util.update_bigquery_schema_on_append(self._schema.fields,
131128
self._output_table)
@@ -136,35 +133,15 @@ def expand(self, pcoll):
136133
self._bigquery_row_generator,
137134
self._allow_incompatible_records,
138135
self._omit_empty_sample_calls))
139-
if self._num_bigquery_write_shards > 1:
140-
# We split data into self._num_bigquery_write_shards random partitions
141-
# and then write each part to final BQ by appending them together.
142-
# Combined with LimitWrite transform, this will avoid the BQ failure.
143-
bq_row_partitions = bq_rows | beam.Partition(
144-
lambda _, n: random.randint(0, n - 1),
145-
self._num_bigquery_write_shards)
146-
bq_writes = []
147-
for i in range(self._num_bigquery_write_shards):
148-
bq_rows = (bq_row_partitions[i] | 'LimitWrite' + str(i) >>
149-
limit_write.LimitWrite(_WRITE_SHARDS_LIMIT))
150-
bq_writes.append(
151-
bq_rows | 'WriteToBigQuery' + str(i) >>
152-
beam.io.Write(beam.io.BigQuerySink(
136+
return (bq_rows
137+
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
153138
self._output_table,
154139
schema=self._schema,
155140
create_disposition=(
156141
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
157142
write_disposition=(
158-
beam.io.BigQueryDisposition.WRITE_APPEND))))
159-
return bq_writes
160-
else:
161-
return (bq_rows
162-
| 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
163-
self._output_table,
164-
schema=self._schema,
165-
create_disposition=(
166-
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
167-
write_disposition=(
168-
beam.io.BigQueryDisposition.WRITE_APPEND
169-
if self._append
170-
else beam.io.BigQueryDisposition.WRITE_TRUNCATE))))
143+
beam.io.BigQueryDisposition.WRITE_APPEND
144+
if self._append
145+
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
146+
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
147+
custom_gcs_temp_location=self._temp_location))

gcp_variant_transforms/vcf_to_bq.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,8 @@ def _run_annotation_pipeline(known_args, pipeline_args):
383383

384384
def _create_sample_info_table(pipeline, # type: beam.Pipeline
385385
pipeline_mode, # type: PipelineModes
386-
known_args, # type: argparse.Namespace
386+
known_args, # type: argparse.Namespace,
387+
pipeline_args, # type: List[str]
387388
):
388389
# type: (...) -> None
389390
headers = pipeline_common.read_headers(
@@ -404,7 +405,6 @@ def run(argv=None):
404405
logging.info('Command: %s', ' '.join(argv or sys.argv))
405406
known_args, pipeline_args = pipeline_common.parse_args(argv,
406407
_COMMAND_LINE_OPTIONS)
407-
408408
if known_args.auto_flags_experiment:
409409
_get_input_dimensions(known_args, pipeline_args)
410410

@@ -480,6 +480,8 @@ def run(argv=None):
480480
num_partitions = 1
481481

482482
if known_args.output_table:
483+
options = pipeline_options.PipelineOptions(pipeline_args)
484+
google_cloud_options = options.view_as(pipeline_options.GoogleCloudOptions)
483485
for i in range(num_partitions):
484486
table_suffix = ''
485487
if partitioner and partitioner.get_partition_name(i):
@@ -489,17 +491,18 @@ def run(argv=None):
489491
variant_to_bigquery.VariantToBigQuery(
490492
table_name,
491493
header_fields,
494+
google_cloud_options.temp_location,
492495
variant_merger,
493496
processed_variant_factory,
494497
append=known_args.append,
495498
update_schema_on_append=known_args.update_schema_on_append,
496499
allow_incompatible_records=known_args.allow_incompatible_records,
497500
omit_empty_sample_calls=known_args.omit_empty_sample_calls,
498-
num_bigquery_write_shards=known_args.num_bigquery_write_shards,
499501
null_numeric_value_replacement=(
500502
known_args.null_numeric_value_replacement)))
501503
if known_args.generate_sample_info_table:
502-
_create_sample_info_table(pipeline, pipeline_mode, known_args)
504+
_create_sample_info_table(
505+
pipeline, pipeline_mode, known_args, pipeline_args)
503506

504507
if known_args.output_avro_path:
505508
# TODO(bashir2): Add an integration test that outputs to Avro files and

0 commit comments

Comments
 (0)