Skip to content

Commit 4d4481c

Browse files
committed
Use new BigQuery sink.
1 parent c5e1e0f commit 4d4481c

File tree

13 files changed

+30
-218
lines changed

13 files changed

+30
-218
lines changed

cloudbuild_CI.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ steps:
4141
- '--skip_build'
4242
- '--project ${PROJECT_ID}'
4343
- '--image_tag ${COMMIT_SHA}'
44-
- '--run_unit_tests'
45-
- '--run_preprocessor_tests'
46-
- '--run_bq_to_vcf_tests'
4744
- '--run_all_tests'
4845
- '--test_name_prefix cloud-ci-'
4946
id: 'test-gcp-variant-transforms-docker'

docs/large_inputs.md

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ Default settings:
1414
--worker_machine_type <default n1-standard-1> \
1515
--disk_size_gb <default 250> \
1616
--worker_disk_type <default PD> \
17-
--num_bigquery_write_shards <default 1> \
1817
--partition_config_path <default None> \
1918
```
2019

@@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery
9897
output rather than just being specified once as in the VCF header), you
9998
typically need 3 to 4 times the total size of the raw VCF files.
10099

101-
In addition, if [merging](variant_merging.md) or
102-
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may
100+
In addition, if [merging](variant_merging.md) is enabled, you may
103101
need more disk per worker (e.g. 500GB) as the same variants need to be
104102
aggregated together on one machine.
105103

@@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`),
110108
they can reduce cost as they can avoid idle CPU cycles due to disk IOPS
111109
limitations.
112110

113-
As a result, we recommend using SSDs if [merging](variant_merge.md) or
114-
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these
115-
operations require "shuffling" the data (i.e. redistributing the data among
116-
workers), which require significant disk I/O.
111+
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
112+
this operation requires "shuffling" the data (i.e. redistributing the data
113+
among workers), which requires significant disk I/O.
117114

118115
Set
119116
`--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd`
120117
to use SSDs.
121118

122-
### `--num_bigquery_write_shards`
123-
124-
Currently, the write operation to BigQuery in Dataflow is performed as a
125-
postprocessing step after the main transforms are done. As a workaround for
126-
BigQuery write limitations (more details
127-
[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)),
128-
we have added "sharding" when writing to BigQuery. This makes the data load
129-
to BigQuery significantly faster as it parallelizes the process and enables
130-
loading large (>5TB) data to BigQuery at once.
131-
132-
As a result, we recommend setting `--num_bigquery_write_shards 20` when loading
133-
any data that has more than 1 billion rows (after merging) or 1TB of final
134-
output. You may use a smaller number of write shards (e.g. 5) when using
135-
[partitioned output](#--partition_config_path) as each partition also acts as a
136-
"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to
137-
fail as there is a maximum limit on the number of concurrent writes per table.
138-
139119
### `--partition_config_path`
140120

141121
Partitioning the output can save significant query costs once the data is in
@@ -146,4 +126,3 @@ partition).
146126
As a result, we recommend setting the partition config for very large data
147127
where possible. Please see the [documentation](partitioning.md) for more
148128
details.
149-

gcp_variant_transforms/options/variant_transform_options.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,7 @@ def add_arguments(self, parser):
195195
parser.add_argument(
196196
'--num_bigquery_write_shards',
197197
type=int, default=1,
198-
help=('Before writing the final result to output BigQuery, the data is '
199-
'sharded to avoid a known failure for very large inputs (issue '
200-
'#199). Setting this flag to 1 will avoid this extra sharding.'
201-
'It is recommended to use 20 for loading large inputs without '
202-
'merging. Use a smaller value (2 or 3) if both merging and '
203-
'optimize_for_large_inputs are enabled.'))
198+
help=('This flag is deprecated and may be removed in future releases.'))
204199
parser.add_argument(
205200
'--null_numeric_value_replacement',
206201
type=int,

gcp_variant_transforms/pipeline_common.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ def parse_args(argv, command_line_options):
7575
if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'):
7676
known_args.all_patterns = _get_all_patterns(
7777
known_args.input_pattern, known_args.input_file)
78+
79+
# Enable new BQ sink experiment.
80+
pipeline_args += ['--experiment', 'use_beam_bq_sink']
7881
return known_args, pipeline_args
7982

8083

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"worker_machine_type": "n1-standard-64",
1010
"max_num_workers": "64",
1111
"num_workers": "20",
12-
"num_bigquery_write_shards": "20",
1312
"assertion_configs": [
1413
{
1514
"query": ["NUM_ROWS_QUERY"],

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"worker_machine_type": "n1-standard-16",
1010
"max_num_workers": "20",
1111
"num_workers": "20",
12-
"num_bigquery_write_shards": "2",
1312
"assertion_configs": [
1413
{
1514
"query": ["NUM_ROWS_QUERY"],
@@ -68,4 +67,3 @@
6867
]
6968
}
7069
]
71-

gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
"worker_machine_type": "n1-standard-16",
88
"max_num_workers": "20",
99
"num_workers": "20",
10-
"num_bigquery_write_shards": "20",
1110
"assertion_configs": [
1211
{
1312
"query": ["NUM_ROWS_QUERY"],

gcp_variant_transforms/transforms/limit_write.py

Lines changed: 0 additions & 57 deletions
This file was deleted.

gcp_variant_transforms/transforms/limit_write_test.py

Lines changed: 0 additions & 76 deletions
This file was deleted.

gcp_variant_transforms/transforms/sample_info_to_bigquery.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def __init__(self, output_table_prefix, append=False,
6262
self._append = append
6363
self.samples_span_multiple_files = samples_span_multiple_files
6464
self._schema = sample_info_table_schema_generator.generate_schema()
65+
self._temp_location = temp_location
6566

6667
def expand(self, pcoll):
6768
return (pcoll
@@ -76,4 +77,5 @@ def expand(self, pcoll):
7677
beam.io.BigQueryDisposition.WRITE_APPEND
7778
if self._append
7879
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
79-
method=beam.io.WriteToBigQuery.Method.FILE_LOADS))
80+
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
81+
custom_gcs_temp_location=self._temp_location))

0 commit comments

Comments
 (0)