Skip to content

Commit d993334

Browse files
authored
Add a flag to include file in the sample_id hash (#526)
* Add a flag to include file in the sample_id hash * Rename flag to samples_span_multiple_files
1 parent be516c7 commit d993334

File tree

4 files changed

+58
-12
lines changed

4 files changed

+58
-12
lines changed

gcp_variant_transforms/options/variant_transform_options.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ def add_arguments(self, parser):
155155
'[EXPERIMENTAL]'
156156
).format(sample_info_table_schema_generator.TABLE_SUFFIX))
157157

158+
parser.add_argument(
159+
'--samples_span_multiple_files',
160+
type='bool', default=True, nargs='?', const=True,
161+
help=('If True sample_id will be the hash of [sample_name] thus it '
162+
'will be independent of file_path, otherwise hash of '
163+
'[file_path, sample_name] will be used as sample_id. '))
164+
158165
parser.add_argument(
159166
'--split_alternate_allele_info_fields',
160167
type='bool', default=True, nargs='?', const=True,

gcp_variant_transforms/transforms/sample_info_to_bigquery.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,16 @@
2424
class ConvertSampleInfoToRow(beam.DoFn):
2525
"""Extracts sample info from `VcfHeader` and converts it to a BigQuery row."""
2626

27-
def process(self, vcf_header):
28-
# type: (vcf_header_io.VcfHeader) -> Dict[str, Union[int, str]]
27+
def process(self, vcf_header, samples_span_multiple_files):
28+
# type: (vcf_header_io.VcfHeader, bool) -> Dict[str, Union[int, str]]
2929
for sample in vcf_header.samples:
30-
sample_id = hashing_util.generate_unsigned_hash_code(
31-
[vcf_header.file_path, sample], max_hash_value=pow(2, 63))
30+
if samples_span_multiple_files:
31+
sample_id = hashing_util.generate_unsigned_hash_code(
32+
[sample], max_hash_value=pow(2, 63))
33+
else:
34+
sample_id = hashing_util.generate_unsigned_hash_code(
35+
[vcf_header.file_path, sample], max_hash_value=pow(2, 63))
36+
3237
row = {
3338
sample_info_table_schema_generator.SAMPLE_ID: sample_id,
3439
sample_info_table_schema_generator.SAMPLE_NAME: sample,
@@ -40,24 +45,28 @@ def process(self, vcf_header):
4045
class SampleInfoToBigQuery(beam.PTransform):
4146
"""Writes sample info to BigQuery."""
4247

43-
def __init__(self, output_table_prefix, append=False):
44-
# type: (str, Dict[str, str], bool) -> None
48+
def __init__(self, output_table_prefix, append=False,
49+
samples_span_multiple_files=False):
50+
# type: (str, Dict[str, str], bool, bool) -> None
4551
"""Initializes the transform.
4652
4753
Args:
4854
output_table_prefix: The prefix of the output BigQuery table.
4955
append: If true, existing records in output_table will not be
5056
overwritten. New records will be appended to those that already exist.
57+
samples_span_multiple_files: If true, sample_id = hash#([sample_name]),
58+
otherwise sample_id = hash#([file_path, sample_name]).
5159
"""
5260
self._output_table = sample_info_table_schema_generator.compose_table_name(
5361
output_table_prefix, sample_info_table_schema_generator.TABLE_SUFFIX)
5462
self._append = append
63+
self.samples_span_multiple_files = samples_span_multiple_files
5564
self._schema = sample_info_table_schema_generator.generate_schema()
5665

5766
def expand(self, pcoll):
5867
return (pcoll
5968
| 'ConvertSampleInfoToBigQueryTableRow' >> beam.ParDo(
60-
ConvertSampleInfoToRow())
69+
ConvertSampleInfoToRow(self._samples_span_multiple_files))
6170
| 'WriteSampleInfoToBigQuery' >> beam.io.WriteToBigQuery(
6271
self._output_table,
6372
schema=self._schema,

gcp_variant_transforms/transforms/sample_info_to_bigquery_test.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ def test_convert_sample_info_to_row(self):
3333
file_path='file_1')
3434
vcf_header_2 = vcf_header_io.VcfHeader(samples=['Sample 1', 'Sample 2'],
3535
file_path='file_2')
36-
file_path_to_file_hash = {'file_1': 'hash_1',
37-
'file_2': 'hash_2'}
3836
expected_rows = [
3937
{sample_info_table_schema_generator.SAMPLE_ID: 5961690698012655974,
4038
sample_info_table_schema_generator.SAMPLE_NAME: 'Sample 1',
@@ -55,7 +53,37 @@ def test_convert_sample_info_to_row(self):
5553
| transforms.Create([vcf_header_1, vcf_header_2])
5654
| 'ConvertToRow'
5755
>> transforms.ParDo(sample_info_to_bigquery.ConvertSampleInfoToRow(
58-
file_path_to_file_hash)))
56+
), False))
57+
58+
assert_that(bigquery_rows, equal_to(expected_rows))
59+
pipeline.run()
60+
61+
def test_convert_sample_info_to_row_without_file_in_hash(self):
62+
vcf_header_1 = vcf_header_io.VcfHeader(samples=['Sample 1', 'Sample 2'],
63+
file_path='file_1')
64+
vcf_header_2 = vcf_header_io.VcfHeader(samples=['Sample 1', 'Sample 2'],
65+
file_path='file_2')
66+
expected_rows = [
67+
{sample_info_table_schema_generator.SAMPLE_ID: 6721344017406412066,
68+
sample_info_table_schema_generator.SAMPLE_NAME: 'Sample 1',
69+
sample_info_table_schema_generator.FILE_PATH: 'file_1'},
70+
{sample_info_table_schema_generator.SAMPLE_ID: 7224630242958043176,
71+
sample_info_table_schema_generator.SAMPLE_NAME: 'Sample 2',
72+
sample_info_table_schema_generator.FILE_PATH: 'file_1'},
73+
{sample_info_table_schema_generator.SAMPLE_ID: 6721344017406412066,
74+
sample_info_table_schema_generator.SAMPLE_NAME: 'Sample 1',
75+
sample_info_table_schema_generator.FILE_PATH: 'file_2'},
76+
{sample_info_table_schema_generator.SAMPLE_ID: 7224630242958043176,
77+
sample_info_table_schema_generator.SAMPLE_NAME: 'Sample 2',
78+
sample_info_table_schema_generator.FILE_PATH: 'file_2'}
79+
]
80+
pipeline = test_pipeline.TestPipeline()
81+
bigquery_rows = (
82+
pipeline
83+
| transforms.Create([vcf_header_1, vcf_header_2])
84+
| 'ConvertToRow'
85+
>> transforms.ParDo(sample_info_to_bigquery.ConvertSampleInfoToRow(
86+
), True))
5987

6088
assert_that(bigquery_rows, equal_to(expected_rows))
6189
pipeline.run()

gcp_variant_transforms/vcf_to_bq.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,8 +388,10 @@ def _create_sample_info_table(pipeline, # type: beam.Pipeline
388388
pipeline_mode,
389389
known_args.all_patterns)
390390
_ = (headers | 'SampleInfoToBigQuery' >>
391-
sample_info_to_bigquery.SampleInfoToBigQuery(known_args.output_table,
392-
known_args.append))
391+
sample_info_to_bigquery.SampleInfoToBigQuery(
392+
known_args.output_table,
393+
known_args.append,
394+
known_args.samples_span_multiple_files))
393395

394396

395397
def run(argv=None):

0 commit comments

Comments
 (0)