Skip to content

Commit 9a865ce

Browse files
authored
Create call info table (#511)
Create call info table - Use flag --generate_call_info_table to form the table. - call_info table contains call_id, call_name and file_path.
1 parent 5044996 commit 9a865ce

12 files changed

+327
-4
lines changed

gcp_variant_transforms/beam_io/vcf_header_io.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from collections import OrderedDict
2020
from functools import partial
21-
from typing import Dict, Iterable # pylint: disable=unused-import
21+
from typing import Dict, Iterable, List # pylint: disable=unused-import
2222
import vcf
2323

2424
import apache_beam as beam
@@ -62,6 +62,7 @@ def __init__(self,
6262
alts=None, # type: OrderedDict[str, vcf.parser._Alt]
6363
formats=None, # type: OrderedDict[str, vcf.parser._Format]
6464
contigs=None, # type: OrderedDict[str, vcf.parser._Contig]
65+
samples=None, # type: List[str]
6566
file_path=None # type: str
6667
):
6768
# type: (...) -> None
@@ -77,6 +78,7 @@ def __init__(self,
7778
alts: A dictionary mapping alt keys to vcf alt metadata values.
7879
formats: A dictionary mapping format keys to vcf format metadata values.
7980
contigs: A dictionary mapping contig keys to vcf contig metadata values.
81+
samples: A list of sample names.
8082
file_path: The full file path of the vcf file.
8183
"""
8284
# type: OrderedDict[str, OrderedDict]
@@ -85,6 +87,7 @@ def __init__(self,
8587
self.alts = self._values_asdict(alts or {})
8688
self.formats = self._values_asdict(formats or {})
8789
self.contigs = self._values_asdict(contigs or {})
90+
self.samples = samples
8891
self.file_path = file_path
8992

9093
def __eq__(self, other):
@@ -145,6 +148,7 @@ def read_records(
145148
alts=vcf_reader.alts,
146149
formats=vcf_reader.formats,
147150
contigs=vcf_reader.contigs,
151+
samples=vcf_reader.samples,
148152
file_path=file_path)
149153

150154
def _read_headers(self, file_path):

gcp_variant_transforms/beam_io/vcf_header_io_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def _get_header_from_reader(vcf_reader, file_path=None):
4242
alts=vcf_reader.alts,
4343
formats=vcf_reader.formats,
4444
contigs=vcf_reader.contigs,
45+
samples=vcf_reader.samples,
4546
file_path=file_path)
4647

4748

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2019 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Generates hashing code for string."""
16+
17+
import json
18+
import sys
19+
from typing import List # pylint: disable=unused-import
20+
21+
import farmhash
22+
23+
24+
def generate_unsigned_hash_code(strings, max_hash_value=sys.maxint):
25+
# type: (List[str], int) -> int
26+
"""Generates a forever-fixed hash code for `strings`.
27+
28+
The hash code generated is in the range [0, max_hash_value). Note that the
29+
hash code generated by farmhash.fingerprint64 is unsigned.
30+
"""
31+
return farmhash.fingerprint64(json.dumps(strings)) % max_hash_value
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright 2019 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import unittest
16+
17+
from gcp_variant_transforms.libs import hashing_util
18+
19+
20+
class HashingUtilTest(unittest.TestCase):
21+
22+
def test_generate_unsigned_hash_code(self):
23+
hash_code = hashing_util.generate_unsigned_hash_code(['gs://bucket/blob',
24+
'sample 1'])
25+
self.assertEqual(hash_code, 797596659968939265)
26+
27+
hash_code = hashing_util.generate_unsigned_hash_code(['gs://bucket/blob',
28+
'sample 1'],
29+
1000)
30+
self.assertEqual(hash_code, 72)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright 2019 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Generates sample_info table schema."""
16+
17+
from apache_beam.io.gcp.internal.clients import bigquery
18+
19+
from gcp_variant_transforms.libs import bigquery_util
20+
21+
SAMPLE_ID = 'sample_id'
22+
SAMPLE_NAME = 'sample_name'
23+
FILE_PATH = 'file_path'
24+
TABLE_SUFFIX = 'sample_info'
25+
26+
27+
def compose_table_name(base_name, suffix):
28+
# type: (str, List[str]) -> str
29+
return '_'.join([base_name, suffix])
30+
31+
32+
def generate_schema():
33+
# type: () -> bigquery.TableSchema
34+
schema = bigquery.TableSchema()
35+
schema.fields.append(bigquery.TableFieldSchema(
36+
name=SAMPLE_ID,
37+
type=bigquery_util.TableFieldConstants.TYPE_INTEGER,
38+
mode=bigquery_util.TableFieldConstants.MODE_NULLABLE,
39+
description='An Integer that uniquely identifies a sample.'))
40+
schema.fields.append(bigquery.TableFieldSchema(
41+
name=SAMPLE_NAME,
42+
type=bigquery_util.TableFieldConstants.TYPE_STRING,
43+
mode=bigquery_util.TableFieldConstants.MODE_NULLABLE,
44+
description=('Name of the sample as we read it from the VCF file.')))
45+
schema.fields.append(bigquery.TableFieldSchema(
46+
name=FILE_PATH,
47+
type=bigquery_util.TableFieldConstants.TYPE_STRING,
48+
mode=bigquery_util.TableFieldConstants.MODE_NULLABLE,
49+
description=('Full file path on GCS of the sample.')))
50+
51+
return schema
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright 2019 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Tests for `sample_info_table_schema_generator` module."""
16+
17+
import unittest
18+
19+
from gcp_variant_transforms.libs import sample_info_table_schema_generator
20+
21+
22+
class GenerateSampleInfoTableSchemaTest(unittest.TestCase):
23+
24+
def test_generate_sample_info_table_schema(self):
25+
schema = sample_info_table_schema_generator.generate_schema()
26+
expected_fields = [sample_info_table_schema_generator.SAMPLE_ID,
27+
sample_info_table_schema_generator.SAMPLE_NAME,
28+
sample_info_table_schema_generator.FILE_PATH]
29+
self.assertEqual(expected_fields, [field.name for field in schema.fields])

gcp_variant_transforms/options/variant_transform_options.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from gcp_variant_transforms.beam_io import vcfio
2323
from gcp_variant_transforms.libs import bigquery_sanitizer
2424
from gcp_variant_transforms.libs import bigquery_util
25+
from gcp_variant_transforms.libs import sample_info_table_schema_generator
2526

2627

2728
class VariantTransformsOptions(object):
@@ -144,6 +145,16 @@ def add_arguments(self, parser):
144145
parser.add_argument('--output_table',
145146
default='',
146147
help='BigQuery table to store the results.')
148+
parser.add_argument(
149+
'--generate_sample_info_table',
150+
type='bool', default=False, nargs='?', const=True,
151+
help=('If set to True, a sample info table with the name '
152+
'output_table_ + {} will be created. This table contains a '
153+
'unique sample_id for each sample read from the VCF file. This '
154+
'sample_id can be used to distinguish between sample names. '
155+
'[EXPERIMENTAL]'
156+
).format(sample_info_table_schema_generator.TABLE_SUFFIX))
157+
147158
parser.add_argument(
148159
'--split_alternate_allele_info_fields',
149160
type='bool', default=True, nargs='?', const=True,
@@ -207,6 +218,14 @@ def validate(self, parsed_args, client=None):
207218
if parsed_args.update_schema_on_append:
208219
raise ValueError('--update_schema_on_append requires --append to be '
209220
'true.')
221+
if parsed_args.generate_sample_info_table:
222+
bigquery_util.raise_error_if_table_exists(
223+
client,
224+
project_id,
225+
dataset_id,
226+
'_'.join([table_id,
227+
sample_info_table_schema_generator.TABLE_SUFFIX]))
228+
210229
bigquery_util.raise_error_if_table_exists(client,
211230
project_id,
212231
dataset_id,

gcp_variant_transforms/options/variant_transform_options_test.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ def _make_args(self, args):
9393

9494
def test_valid_table_path(self):
9595
args = self._make_args(['--append',
96-
'--output_table', 'project:dataset.table'])
96+
'--output_table', 'project:dataset.table',
97+
'--generate_sample_info_table'])
9798
client = mock.Mock()
9899
client.datasets.Get.return_value = bigquery.Dataset(
99100
datasetReference=bigquery.DatasetReference(
@@ -104,7 +105,16 @@ def test_existing_table(self):
104105
args = self._make_args(['--append', 'False',
105106
'--output_table', 'project:dataset.table'])
106107
client = mock.Mock()
107-
self.assertRaises(ValueError, self._options.validate, args, client)
108+
with self.assertRaisesRegexp(ValueError, 'project:dataset.table '):
109+
self._options.validate(args, client)
110+
111+
args = self._make_args(['--append', 'False',
112+
'--output_table', 'project:dataset.table',
113+
'--generate_sample_info_table'])
114+
client = mock.Mock()
115+
with self.assertRaisesRegexp(ValueError,
116+
'project:dataset.table_sample_info'):
117+
self._options.validate(args, client)
108118

109119
def test_no_project(self):
110120
args = self._make_args(['--output_table', 'dataset.table'])
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright 2019 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Dict, Union # pylint: disable=unused-import
16+
17+
import apache_beam as beam
18+
19+
from gcp_variant_transforms.beam_io import vcf_header_io # pylint: disable=unused-import
20+
from gcp_variant_transforms.libs import sample_info_table_schema_generator
21+
from gcp_variant_transforms.libs import hashing_util
22+
23+
24+
class ConvertSampleInfoToRow(beam.DoFn):
25+
"""Extracts sample info from `VcfHeader` and converts it to a BigQuery row."""
26+
27+
def process(self, vcf_header):
28+
# type: (vcf_header_io.VcfHeader) -> Dict[str, Union[int, str]]
29+
for sample in vcf_header.samples:
30+
sample_id = hashing_util.generate_unsigned_hash_code(
31+
[vcf_header.file_path, sample], max_hash_value=pow(2, 63))
32+
row = {
33+
sample_info_table_schema_generator.SAMPLE_ID: sample_id,
34+
sample_info_table_schema_generator.SAMPLE_NAME: sample,
35+
sample_info_table_schema_generator.FILE_PATH: vcf_header.file_path
36+
}
37+
yield row
38+
39+
40+
class SampleInfoToBigQuery(beam.PTransform):
41+
"""Writes sample info to BigQuery."""
42+
43+
def __init__(self, output_table_prefix, append=False):
44+
# type: (str, Dict[str, str], bool) -> None
45+
"""Initializes the transform.
46+
47+
Args:
48+
output_table_prefix: The prefix of the output BigQuery table.
49+
append: If true, existing records in output_table will not be
50+
overwritten. New records will be appended to those that already exist.
51+
"""
52+
self._output_table = sample_info_table_schema_generator.compose_table_name(
53+
output_table_prefix, sample_info_table_schema_generator.TABLE_SUFFIX)
54+
self._append = append
55+
self._schema = sample_info_table_schema_generator.generate_schema()
56+
57+
def expand(self, pcoll):
58+
return (pcoll
59+
| 'ConvertSampleInfoToBigQueryTableRow' >> beam.ParDo(
60+
ConvertSampleInfoToRow())
61+
| 'WriteSampleInfoToBigQuery' >> beam.io.WriteToBigQuery(
62+
self._output_table,
63+
schema=self._schema,
64+
create_disposition=(
65+
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
66+
write_disposition=(
67+
beam.io.BigQueryDisposition.WRITE_APPEND
68+
if self._append
69+
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
70+
method=beam.io.WriteToBigQuery.Method.FILE_LOADS))
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright 2019 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Tests for `sample_info_to_bigquery` module."""
16+
17+
import unittest
18+
19+
from apache_beam import transforms
20+
from apache_beam.testing import test_pipeline
21+
from apache_beam.testing.util import assert_that
22+
from apache_beam.testing.util import equal_to
23+
24+
from gcp_variant_transforms.beam_io import vcf_header_io
25+
from gcp_variant_transforms.libs import sample_info_table_schema_generator
26+
from gcp_variant_transforms.transforms import sample_info_to_bigquery
27+
28+
29+
class ConvertSampleInfoToRowTest(unittest.TestCase):
30+
31+
def test_convert_sample_info_to_row(self):
32+
vcf_header_1 = vcf_header_io.VcfHeader(samples=['Sample 1', 'Sample 2'],
33+
file_path='file_1')
34+
vcf_header_2 = vcf_header_io.VcfHeader(samples=['Sample 1', 'Sample 2'],
35+
file_path='file_2')
36+
file_path_to_file_hash = {'file_1': 'hash_1',
37+
'file_2': 'hash_2'}
38+
expected_rows = [
39+
{sample_info_table_schema_generator.SAMPLE_ID: 5961690698012655974,
40+
sample_info_table_schema_generator.SAMPLE_NAME: 'Sample 1',
41+
sample_info_table_schema_generator.FILE_PATH: 'file_1'},
42+
{sample_info_table_schema_generator.SAMPLE_ID: 5854056809620188906,
43+
sample_info_table_schema_generator.SAMPLE_NAME: 'Sample 2',
44+
sample_info_table_schema_generator.FILE_PATH: 'file_1'},
45+
{sample_info_table_schema_generator.SAMPLE_ID: 5259968798637352651,
46+
sample_info_table_schema_generator.SAMPLE_NAME: 'Sample 1',
47+
sample_info_table_schema_generator.FILE_PATH: 'file_2'},
48+
{sample_info_table_schema_generator.SAMPLE_ID: 6253115674664185777,
49+
sample_info_table_schema_generator.SAMPLE_NAME: 'Sample 2',
50+
sample_info_table_schema_generator.FILE_PATH: 'file_2'}
51+
]
52+
pipeline = test_pipeline.TestPipeline()
53+
bigquery_rows = (
54+
pipeline
55+
| transforms.Create([vcf_header_1, vcf_header_2])
56+
| 'ConvertToRow'
57+
>> transforms.ParDo(sample_info_to_bigquery.ConvertSampleInfoToRow(
58+
file_path_to_file_hash)))
59+
60+
assert_that(bigquery_rows, equal_to(expected_rows))
61+
pipeline.run()

0 commit comments

Comments
 (0)