Skip to content

Commit 666fa4a

Browse files
committed
Use new BigQuery sink.
1 parent c5e1e0f commit 666fa4a

File tree

12 files changed

+30
-215
lines changed

12 files changed

+30
-215
lines changed

docs/large_inputs.md

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ Default settings:
1414
--worker_machine_type <default n1-standard-1> \
1515
--disk_size_gb <default 250> \
1616
--worker_disk_type <default PD> \
17-
--num_bigquery_write_shards <default 1> \
1817
--partition_config_path <default None> \
1918
```
2019

@@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery
9897
output rather than just being specified once as in the VCF header), you
9998
typically need 3 to 4 times the total size of the raw VCF files.
10099

101-
In addition, if [merging](variant_merging.md) or
102-
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may
100+
In addition, if [merging](variant_merging.md) is enabled, you may
103101
need more disk per worker (e.g. 500GB) as the same variants need to be
104102
aggregated together on one machine.
105103

@@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`),
110108
they can reduce cost as they can avoid idle CPU cycles due to disk IOPS
111109
limitations.
112110

113-
As a result, we recommend using SSDs if [merging](variant_merge.md) or
114-
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these
115-
operations require "shuffling" the data (i.e. redistributing the data among
116-
workers), which require significant disk I/O.
111+
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
112+
this operation requires "shuffling" the data (i.e. redistributing the data
113+
among workers), which requires significant disk I/O.
117114

118115
Set
119116
`--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd`
120117
to use SSDs.
121118

122-
### `--num_bigquery_write_shards`
123-
124-
Currently, the write operation to BigQuery in Dataflow is performed as a
125-
postprocessing step after the main transforms are done. As a workaround for
126-
BigQuery write limitations (more details
127-
[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)),
128-
we have added "sharding" when writing to BigQuery. This makes the data load
129-
to BigQuery significantly faster as it parallelizes the process and enables
130-
loading large (>5TB) data to BigQuery at once.
131-
132-
As a result, we recommend setting `--num_bigquery_write_shards 20` when loading
133-
any data that has more than 1 billion rows (after merging) or 1TB of final
134-
output. You may use a smaller number of write shards (e.g. 5) when using
135-
[partitioned output](#--partition_config_path) as each partition also acts as a
136-
"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to
137-
fail as there is a maximum limit on the number of concurrent writes per table.
138-
139119
### `--partition_config_path`
140120

141121
Partitioning the output can save significant query costs once the data is in
@@ -146,4 +126,3 @@ partition).
146126
As a result, we recommend setting the partition config for very large data
147127
where possible. Please see the [documentation](partitioning.md) for more
148128
details.
149-

gcp_variant_transforms/options/variant_transform_options.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,7 @@ def add_arguments(self, parser):
195195
parser.add_argument(
196196
'--num_bigquery_write_shards',
197197
type=int, default=1,
198-
help=('Before writing the final result to output BigQuery, the data is '
199-
'sharded to avoid a known failure for very large inputs (issue '
200-
'#199). Setting this flag to 1 will avoid this extra sharding.'
201-
'It is recommended to use 20 for loading large inputs without '
202-
'merging. Use a smaller value (2 or 3) if both merging and '
203-
'optimize_for_large_inputs are enabled.'))
198+
help=('This flag is deprecated and may be removed in future releases.'))
204199
parser.add_argument(
205200
'--null_numeric_value_replacement',
206201
type=int,

gcp_variant_transforms/pipeline_common.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ def parse_args(argv, command_line_options):
7575
if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'):
7676
known_args.all_patterns = _get_all_patterns(
7777
known_args.input_pattern, known_args.input_file)
78+
79+
# Enable new BQ sink experiment.
80+
pipeline_args += ['--experiment', 'use_beam_bq_sink']
7881
return known_args, pipeline_args
7982

8083

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"worker_machine_type": "n1-standard-64",
1010
"max_num_workers": "64",
1111
"num_workers": "20",
12-
"num_bigquery_write_shards": "20",
1312
"assertion_configs": [
1413
{
1514
"query": ["NUM_ROWS_QUERY"],

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"worker_machine_type": "n1-standard-16",
1010
"max_num_workers": "20",
1111
"num_workers": "20",
12-
"num_bigquery_write_shards": "2",
1312
"assertion_configs": [
1413
{
1514
"query": ["NUM_ROWS_QUERY"],
@@ -68,4 +67,3 @@
6867
]
6968
}
7069
]
71-

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
"worker_machine_type": "n1-standard-16",
88
"max_num_workers": "20",
99
"num_workers": "20",
10-
"num_bigquery_write_shards": "20",
1110
"assertion_configs": [
1211
{
1312
"query": ["NUM_ROWS_QUERY"],

gcp_variant_transforms/transforms/limit_write.py

Lines changed: 0 additions & 57 deletions
This file was deleted.

gcp_variant_transforms/transforms/limit_write_test.py

Lines changed: 0 additions & 76 deletions
This file was deleted.

gcp_variant_transforms/transforms/sample_info_to_bigquery.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def __init__(self, output_table_prefix, append=False,
6262
self._append = append
6363
self.samples_span_multiple_files = samples_span_multiple_files
6464
self._schema = sample_info_table_schema_generator.generate_schema()
65+
self._temp_location = temp_location
6566

6667
def expand(self, pcoll):
6768
return (pcoll
@@ -76,4 +77,5 @@ def expand(self, pcoll):
7677
beam.io.BigQueryDisposition.WRITE_APPEND
7778
if self._append
7879
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
79-
method=beam.io.WriteToBigQuery.Method.FILE_LOADS))
80+
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
81+
custom_gcs_temp_location=self._temp_location))

gcp_variant_transforms/transforms/variant_to_bigquery.py

Lines changed: 11 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
from __future__ import absolute_import
1818

19-
import random
2019
from typing import Dict, List # pylint: disable=unused-import
2120

2221
import apache_beam as beam
@@ -29,13 +28,6 @@
2928
from gcp_variant_transforms.libs import processed_variant
3029
from gcp_variant_transforms.libs import vcf_field_conflict_resolver
3130
from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import
32-
from gcp_variant_transforms.transforms import limit_write
33-
34-
35-
# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK,
36-
# see: https://issues.apache.org/jira/browse/BEAM-2801
37-
# This has to be less than 10000.
38-
_WRITE_SHARDS_LIMIT = 1000
3931

4032

4133
@beam.typehints.with_input_types(processed_variant.ProcessedVariant)
@@ -67,6 +59,7 @@ def __init__(
6759
self,
6860
output_table, # type: str
6961
header_fields, # type: vcf_header_io.VcfHeader
62+
temp_location, # type: str
7063
variant_merger=None, # type: variant_merge_strategy.VariantMergeStrategy
7164
proc_var_factory=None, # type: processed_variant.ProcessedVariantFactory
7265
# TODO(bashir2): proc_var_factory is a required argument and if `None` is
@@ -75,8 +68,8 @@ def __init__(
7568
update_schema_on_append=False, # type: bool
7669
allow_incompatible_records=False, # type: bool
7770
omit_empty_sample_calls=False, # type: bool
78-
num_bigquery_write_shards=1, # type: int
79-
null_numeric_value_replacement=None # type: int
71+
null_numeric_value_replacement=None # type: int,
72+
8073
):
8174
# type: (...) -> None
8275
"""Initializes the transform.
@@ -99,15 +92,14 @@ def __init__(
9992
+ schema if there is a mismatch.
10093
omit_empty_sample_calls: If true, samples that don't have a given call
10194
will be omitted.
102-
num_bigquery_write_shards: If > 1, we will limit number of sources which
103-
are used for writing to the output BigQuery table.
10495
null_numeric_value_replacement: the value to use instead of null for
10596
numeric (float/int/long) lists. For instance, [0, None, 1] will become
10697
[0, `null_numeric_value_replacement`, 1]. If not set, the value will set
10798
to bigquery_util._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT.
10899
"""
109100
self._output_table = output_table
110101
self._header_fields = header_fields
102+
self._temp_location = temp_location
111103
self._variant_merger = variant_merger
112104
self._proc_var_factory = proc_var_factory
113105
self._append = append
@@ -125,7 +117,6 @@ def __init__(
125117

126118
self._allow_incompatible_records = allow_incompatible_records
127119
self._omit_empty_sample_calls = omit_empty_sample_calls
128-
self._num_bigquery_write_shards = num_bigquery_write_shards
129120
if update_schema_on_append:
130121
bigquery_util.update_bigquery_schema_on_append(self._schema.fields,
131122
self._output_table)
@@ -136,35 +127,15 @@ def expand(self, pcoll):
136127
self._bigquery_row_generator,
137128
self._allow_incompatible_records,
138129
self._omit_empty_sample_calls))
139-
if self._num_bigquery_write_shards > 1:
140-
# We split data into self._num_bigquery_write_shards random partitions
141-
# and then write each part to final BQ by appending them together.
142-
# Combined with LimitWrite transform, this will avoid the BQ failure.
143-
bq_row_partitions = bq_rows | beam.Partition(
144-
lambda _, n: random.randint(0, n - 1),
145-
self._num_bigquery_write_shards)
146-
bq_writes = []
147-
for i in range(self._num_bigquery_write_shards):
148-
bq_rows = (bq_row_partitions[i] | 'LimitWrite' + str(i) >>
149-
limit_write.LimitWrite(_WRITE_SHARDS_LIMIT))
150-
bq_writes.append(
151-
bq_rows | 'WriteToBigQuery' + str(i) >>
152-
beam.io.Write(beam.io.BigQuerySink(
130+
return (bq_rows
131+
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
153132
self._output_table,
154133
schema=self._schema,
155134
create_disposition=(
156135
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
157136
write_disposition=(
158-
beam.io.BigQueryDisposition.WRITE_APPEND))))
159-
return bq_writes
160-
else:
161-
return (bq_rows
162-
| 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
163-
self._output_table,
164-
schema=self._schema,
165-
create_disposition=(
166-
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
167-
write_disposition=(
168-
beam.io.BigQueryDisposition.WRITE_APPEND
169-
if self._append
170-
else beam.io.BigQueryDisposition.WRITE_TRUNCATE))))
137+
beam.io.BigQueryDisposition.WRITE_APPEND
138+
if self._append
139+
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
140+
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
141+
custom_gcs_temp_location=self._temp_location))

0 commit comments

Comments
 (0)