Skip to content

Commit d2ef98c

Browse files
committed
Use new BigQuery sink.
1 parent 4394cde commit d2ef98c

File tree

12 files changed

+26
-215
lines changed

12 files changed

+26
-215
lines changed

docs/large_inputs.md

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ Default settings:
1414
--worker_machine_type <default n1-standard-1> \
1515
--disk_size_gb <default 250> \
1616
--worker_disk_type <default PD> \
17-
--num_bigquery_write_shards <default 1> \
1817
--partition_config_path <default None> \
1918
```
2019

@@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery
9897
output rather than just being specified once as in the VCF header), you
9998
typically need 3 to 4 times the total size of the raw VCF files.
10099

101-
In addition, if [merging](variant_merging.md) or
102-
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may
100+
In addition, if [merging](variant_merging.md) is enabled, you may
103101
need more disk per worker (e.g. 500GB) as the same variants need to be
104102
aggregated together on one machine.
105103

@@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`),
110108
they can reduce cost as they can avoid idle CPU cycles due to disk IOPS
111109
limitations.
112110

113-
As a result, we recommend using SSDs if [merging](variant_merge.md) or
114-
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these
115-
operations require "shuffling" the data (i.e. redistributing the data among
116-
workers), which require significant disk I/O.
111+
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
112+
this operation requires "shuffling" the data (i.e. redistributing the data
113+
among workers), which requires significant disk I/O.
117114

118115
Set
119116
`--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd`
120117
to use SSDs.
121118

122-
### `--num_bigquery_write_shards`
123-
124-
Currently, the write operation to BigQuery in Dataflow is performed as a
125-
postprocessing step after the main transforms are done. As a workaround for
126-
BigQuery write limitations (more details
127-
[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)),
128-
we have added "sharding" when writing to BigQuery. This makes the data load
129-
to BigQuery significantly faster as it parallelizes the process and enables
130-
loading large (>5TB) data to BigQuery at once.
131-
132-
As a result, we recommend setting `--num_bigquery_write_shards 20` when loading
133-
any data that has more than 1 billion rows (after merging) or 1TB of final
134-
output. You may use a smaller number of write shards (e.g. 5) when using
135-
[sharded output](#--sharding_config_path) as each partition also acts as a
136-
"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to
137-
fail as there is a maximum limit on the number of concurrent writes per table.
138-
139119
### `--sharding_config_path`
140120

141121
Sharding the output can save significant query costs once the data is in
@@ -146,4 +126,3 @@ partition).
146126
As a result, we recommend setting the partition config for very large data
147127
where possible. Please see the [documentation](sharding.md) for more
148128
details.
149-

gcp_variant_transforms/options/variant_transform_options.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,12 +185,7 @@ def add_arguments(self, parser):
185185
parser.add_argument(
186186
'--num_bigquery_write_shards',
187187
type=int, default=1,
188-
help=('Before writing the final result to output BigQuery, the data is '
189-
'sharded to avoid a known failure for very large inputs (issue '
190-
'#199). Setting this flag to 1 will avoid this extra sharding.'
191-
'It is recommended to use 20 for loading large inputs without '
192-
'merging. Use a smaller value (2 or 3) if both merging and '
193-
'optimize_for_large_inputs are enabled.'))
188+
help=('This flag is deprecated and may be removed in future releases.'))
194189
parser.add_argument(
195190
'--null_numeric_value_replacement',
196191
type=int,

gcp_variant_transforms/pipeline_common.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ def parse_args(argv, command_line_options):
7777
if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'):
7878
known_args.all_patterns = _get_all_patterns(
7979
known_args.input_pattern, known_args.input_file)
80+
81+
# Enable new BQ sink experiment.
82+
pipeline_args += ['--experiment', 'use_beam_bq_sink']
8083
return known_args, pipeline_args
8184

8285

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"worker_machine_type": "n1-standard-64",
1010
"max_num_workers": "64",
1111
"num_workers": "20",
12-
"num_bigquery_write_shards": "20",
1312
"assertion_configs": [
1413
{
1514
"query": ["NUM_ROWS_QUERY"],

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"worker_machine_type": "n1-standard-16",
1010
"max_num_workers": "20",
1111
"num_workers": "20",
12-
"num_bigquery_write_shards": "2",
1312
"assertion_configs": [
1413
{
1514
"query": ["NUM_ROWS_QUERY"],
@@ -68,4 +67,3 @@
6867
]
6968
}
7069
]
71-

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
"worker_machine_type": "n1-standard-16",
88
"max_num_workers": "20",
99
"num_workers": "20",
10-
"num_bigquery_write_shards": "20",
1110
"assertion_configs": [
1211
{
1312
"query": ["NUM_ROWS_QUERY"],

gcp_variant_transforms/transforms/limit_write.py

Lines changed: 0 additions & 57 deletions
This file was deleted.

gcp_variant_transforms/transforms/limit_write_test.py

Lines changed: 0 additions & 76 deletions
This file was deleted.

gcp_variant_transforms/transforms/sample_info_to_bigquery.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def __init__(self, output_table_prefix, sample_name_encoding, append=False):
6969
self._append = append
7070
self._sample_name_encoding = sample_name_encoding
7171
self._schema = sample_info_table_schema_generator.generate_schema()
72+
self._temp_location = temp_location
7273

7374
def expand(self, pcoll):
7475
return (pcoll
@@ -82,4 +83,6 @@ def expand(self, pcoll):
8283
write_disposition=(
8384
beam.io.BigQueryDisposition.WRITE_APPEND
8485
if self._append
85-
else beam.io.BigQueryDisposition.WRITE_TRUNCATE))))
86+
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
87+
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
88+
custom_gcs_temp_location=self._temp_location))

gcp_variant_transforms/transforms/variant_to_bigquery.py

Lines changed: 9 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
from __future__ import absolute_import
1818

19-
import random
2019
from typing import Dict, List # pylint: disable=unused-import
2120

2221
import apache_beam as beam
@@ -29,13 +28,6 @@
2928
from gcp_variant_transforms.libs import processed_variant
3029
from gcp_variant_transforms.libs import vcf_field_conflict_resolver
3130
from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import
32-
from gcp_variant_transforms.transforms import limit_write
33-
34-
35-
# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK,
36-
# see: https://issues.apache.org/jira/browse/BEAM-2801
37-
# This has to be less than 10000.
38-
_WRITE_SHARDS_LIMIT = 1000
3931

4032

4133
@beam.typehints.with_input_types(processed_variant.ProcessedVariant)
@@ -71,8 +63,8 @@ def __init__(
7163
update_schema_on_append=False, # type: bool
7264
allow_incompatible_records=False, # type: bool
7365
omit_empty_sample_calls=False, # type: bool
74-
num_bigquery_write_shards=1, # type: int
75-
null_numeric_value_replacement=None # type: int
66+
null_numeric_value_replacement=None # type: int,
67+
7668
):
7769
# type: (...) -> None
7870
"""Initializes the transform.
@@ -88,8 +80,6 @@ def __init__(
8880
+ schema if there is a mismatch.
8981
omit_empty_sample_calls: If true, samples that don't have a given call
9082
will be omitted.
91-
num_bigquery_write_shards: If > 1, we will limit number of sources which
92-
are used for writing to the output BigQuery table.
9383
null_numeric_value_replacement: the value to use instead of null for
9484
numeric (float/int/long) lists. For instance, [0, None, 1] will become
9585
[0, `null_numeric_value_replacement`, 1]. If not set, the value will set
@@ -109,7 +99,6 @@ def __init__(
10999

110100
self._allow_incompatible_records = allow_incompatible_records
111101
self._omit_empty_sample_calls = omit_empty_sample_calls
112-
self._num_bigquery_write_shards = num_bigquery_write_shards
113102
if update_schema_on_append:
114103
bigquery_util.update_bigquery_schema_on_append(self._schema.fields,
115104
self._output_table)
@@ -120,35 +109,15 @@ def expand(self, pcoll):
120109
self._bigquery_row_generator,
121110
self._allow_incompatible_records,
122111
self._omit_empty_sample_calls))
123-
if self._num_bigquery_write_shards > 1:
124-
# We split data into self._num_bigquery_write_shards random partitions
125-
# and then write each part to final BQ by appending them together.
126-
# Combined with LimitWrite transform, this will avoid the BQ failure.
127-
bq_row_partitions = bq_rows | beam.Partition(
128-
lambda _, n: random.randint(0, n - 1),
129-
self._num_bigquery_write_shards)
130-
bq_writes = []
131-
for i in range(self._num_bigquery_write_shards):
132-
bq_rows = (bq_row_partitions[i] | 'LimitWrite' + str(i) >>
133-
limit_write.LimitWrite(_WRITE_SHARDS_LIMIT))
134-
bq_writes.append(
135-
bq_rows | 'WriteToBigQuery' + str(i) >>
136-
beam.io.Write(beam.io.BigQuerySink(
112+
return (bq_rows
113+
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
137114
self._output_table,
138115
schema=self._schema,
139116
create_disposition=(
140117
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
141118
write_disposition=(
142-
beam.io.BigQueryDisposition.WRITE_APPEND))))
143-
return bq_writes
144-
else:
145-
return (bq_rows
146-
| 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
147-
self._output_table,
148-
schema=self._schema,
149-
create_disposition=(
150-
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
151-
write_disposition=(
152-
beam.io.BigQueryDisposition.WRITE_APPEND
153-
if self._append
154-
else beam.io.BigQueryDisposition.WRITE_TRUNCATE))))
119+
beam.io.BigQueryDisposition.WRITE_APPEND
120+
if self._append
121+
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
122+
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
123+
custom_gcs_temp_location=self._temp_location))

0 commit comments

Comments
 (0)