Skip to content

Commit 4394cde

Browse files
authored
Store sample ID hash hex string in BQ tables. (#542)
This PR is an initial step of changing the way we handle sample names in our tables. In future PRs, this field in our codebase and in BQ tables (currently call.name) should be renamed to call.sample_id and then stored as hash integer instead of a hex string.
1 parent e3b1cc7 commit 4394cde

File tree

11 files changed

+308
-155
lines changed

11 files changed

+308
-155
lines changed

gcp_variant_transforms/beam_io/vcf_parser.py

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from __future__ import absolute_import
2121

2222
from collections import namedtuple
23+
import enum
2324
from typing import Iterable # pylint: disable=unused-import
2425
import logging
2526
import os
@@ -30,6 +31,7 @@
3031
from pysam import libcbcf
3132

3233
from gcp_variant_transforms.beam_io import bgzf
34+
from gcp_variant_transforms.libs import hashing_util
3335

3436
# Stores data about failed VCF record reads. `line` is the text line that
3537
# caused the failed read and `file_name` is the name of the file that the read
@@ -55,6 +57,13 @@
5557
INFO_HEADER_TAG = '##INFO'
5658
LAST_HEADER_LINE_PREFIX = '#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO'
5759

60+
61+
class SampleNameEncoding(enum.Enum):
62+
"""An Enum specifying the way we encode sample_name."""
63+
WITHOUT_FILE_PATH = 0
64+
WITH_FILE_PATH = 1
65+
66+
5867
class Variant(object):
5968
"""A class to store info about a genomic variant.
6069
@@ -244,24 +253,27 @@ class VcfParser(object):
244253
```
245254
"""
246255

247-
def __init__(self,
248-
file_name, # type: str
249-
range_tracker, # type: range_trackers.OffsetRangeTracker
250-
file_pattern, # type: str
251-
compression_type, # type: str
252-
allow_malformed_records, # type: bool
253-
representative_header_lines=None, # type: List[str]
254-
splittable_bgzf=False, # type: bool
255-
pre_infer_headers=False, # type: bool
256-
**kwargs # type: **str
257-
):
256+
def __init__(
257+
self,
258+
file_name, # type: str
259+
range_tracker, # type: range_trackers.OffsetRangeTracker
260+
file_pattern, # type: str
261+
compression_type, # type: str
262+
allow_malformed_records, # type: bool
263+
representative_header_lines=None, # type: List[str]
264+
splittable_bgzf=False, # type: bool
265+
pre_infer_headers=False, # type: bool
266+
sample_name_encoding=SampleNameEncoding.WITHOUT_FILE_PATH, # type: int
267+
**kwargs # type: **str
268+
):
258269
# type: (...) -> None
259270
# If `representative_header_lines` is given, header lines in `file_name`
260271
# are ignored; refer to _process_header_lines() logic.
261272
self._representative_header_lines = representative_header_lines
262273
self._file_name = file_name
263274
self._allow_malformed_records = allow_malformed_records
264275
self._pre_infer_headers = pre_infer_headers
276+
self._sample_name_encoding = sample_name_encoding
265277

266278
if splittable_bgzf:
267279
text_source = bgzf.BGZFBlockSource(
@@ -405,17 +417,19 @@ class PySamParser(VcfParser):
405417
class - we could only use a single pipe, but it will divert the parsers.
406418
"""
407419

408-
def __init__(self,
409-
file_name, # type: str
410-
range_tracker, # type: range_trackers.OffsetRangeTracker
411-
compression_type, # type: str
412-
allow_malformed_records, # type: bool
413-
file_pattern=None, # type: str
414-
representative_header_lines=None, # type: List[str]
415-
splittable_bgzf=False, # type: bool
416-
pre_infer_headers=False, # type: bool
417-
**kwargs # type: **str
418-
):
420+
def __init__(
421+
self,
422+
file_name, # type: str
423+
range_tracker, # type: range_trackers.OffsetRangeTracker
424+
compression_type, # type: str
425+
allow_malformed_records, # type: bool
426+
file_pattern=None, # type: str
427+
representative_header_lines=None, # type: List[str]
428+
splittable_bgzf=False, # type: bool
429+
pre_infer_headers=False, # type: bool
430+
sample_name_encoding=SampleNameEncoding.WITHOUT_FILE_PATH, # type: int
431+
**kwargs # type: **str
432+
):
419433
# type: (...) -> None
420434
super(PySamParser, self).__init__(file_name,
421435
range_tracker,
@@ -425,12 +439,15 @@ def __init__(self,
425439
representative_header_lines,
426440
splittable_bgzf,
427441
pre_infer_headers,
442+
sample_name_encoding,
428443
**kwargs)
429444
# These members will be properly initiated in _init_parent_process().
430445
self._vcf_reader = None
431446
self._to_child = None
432447
self._original_info_list = None
433448
self._process_pid = None
449+
self._encoded_sample_names = {}
450+
self._sample_name_encoding = sample_name_encoding
434451

435452
def send_kill_signal_to_child(self):
436453
self._to_child.write('\n')
@@ -592,6 +609,17 @@ def _convert_field(self, value, is_phaseset=False):
592609
value = value.encode('utf-8')
593610
return str(value)
594611

612+
def _lookup_encoded_sample_name(self, sample_name):
613+
sample_id = self._encoded_sample_names.get(sample_name)
614+
if not sample_id:
615+
if self._sample_name_encoding == SampleNameEncoding.WITH_FILE_PATH:
616+
sample_id = hex(hashing_util.generate_sample_id(
617+
sample_name, self._file_name))
618+
else:
619+
sample_id = hex(hashing_util.generate_sample_id(sample_name))
620+
self._encoded_sample_names[sample_name] = sample_id
621+
return sample_id
622+
595623
def _get_variant_calls(self, samples):
596624
# type: (libcvcf.VariantRecordSamples) -> List[VariantCall]
597625
calls = []
@@ -623,6 +651,7 @@ def _get_variant_calls(self, samples):
623651
# before settings default phaseset value.
624652
if phaseset is None and sample.phased and len(genotype) > 1:
625653
phaseset = DEFAULT_PHASESET_VALUE
626-
calls.append(VariantCall(name, genotype, phaseset, info))
654+
encoded_name = self._lookup_encoded_sample_name(name)
655+
calls.append(VariantCall(encoded_name, genotype, phaseset, info))
627656

628657
return calls

gcp_variant_transforms/beam_io/vcfio.py

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
MISSING_GENOTYPE_VALUE = vcf_parser.MISSING_GENOTYPE_VALUE
4848
Variant = vcf_parser.Variant
4949
VariantCall = vcf_parser.VariantCall
50+
SampleNameEncoding = vcf_parser.SampleNameEncoding
5051

5152

5253
class _ToVcfRecordCoder(coders.Coder):
@@ -58,7 +59,6 @@ def encode(self, variant):
5859
encoded_info = self._encode_variant_info(variant)
5960
format_keys = self._get_variant_format_keys(variant)
6061
encoded_calls = self._encode_variant_calls(variant, format_keys)
61-
6262
columns = [
6363
variant.reference_name,
6464
None if variant.start is None else variant.start + 1,
@@ -182,15 +182,17 @@ class _VcfSource(filebasedsource.FileBasedSource):
182182

183183
DEFAULT_VCF_READ_BUFFER_SIZE = 65536 # 64kB
184184

185-
def __init__(self,
186-
file_pattern, # type: str
187-
representative_header_lines=None, # type: List[str]
188-
compression_type=CompressionTypes.AUTO, # type: str
189-
buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE, # type: int
190-
validate=True, # type: bool
191-
allow_malformed_records=False, # type: bool
192-
pre_infer_headers=False, # type: bool
193-
):
185+
def __init__(
186+
self,
187+
file_pattern, # type: str
188+
representative_header_lines=None, # type: List[str]
189+
compression_type=CompressionTypes.AUTO, # type: str
190+
buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE, # type: int
191+
validate=True, # type: bool
192+
allow_malformed_records=False, # type: bool
193+
pre_infer_headers=False, # type: bool
194+
sample_name_encoding=SampleNameEncoding.WITHOUT_FILE_PATH # type: int
195+
):
194196
# type: (...) -> None
195197
super(_VcfSource, self).__init__(file_pattern,
196198
compression_type=compression_type,
@@ -200,6 +202,8 @@ def __init__(self,
200202
self._buffer_size = buffer_size
201203
self._allow_malformed_records = allow_malformed_records
202204
self._pre_infer_headers = pre_infer_headers
205+
self._sample_name_encoding = sample_name_encoding
206+
203207

204208
def read_records(self,
205209
file_name, # type: str
@@ -214,6 +218,7 @@ def read_records(self,
214218
file_pattern=self._pattern,
215219
representative_header_lines=self._representative_header_lines,
216220
pre_infer_headers=self._pre_infer_headers,
221+
sample_name_encoding=self._sample_name_encoding,
217222
buffer_size=self._buffer_size,
218223
skip_header_lines=0)
219224

@@ -229,7 +234,9 @@ def __init__(self,
229234
input_files,
230235
representative_header_lines,
231236
allow_malformed_records,
232-
pre_infer_headers):
237+
pre_infer_headers,
238+
sample_name_encoding=SampleNameEncoding.WITHOUT_FILE_PATH
239+
):
233240
# type: (List[str], List[str], bool) -> None
234241
"""Initializes the transform.
235242
@@ -239,11 +246,16 @@ def __init__(self,
239246
VCF files.
240247
allow_malformed_records: If true, malformed records from VCF files will be
241248
returned as `MalformedVcfRecord` instead of failing the pipeline.
249+
pre_infer_headers: If true, drop headers and make sure PySam return the
250+
exact data for variants and calls, without type matching.
251+
sample_name_encoding: specify how we want to encode sample_name mainly
252+
to deal with same sample_name used across multiple VCF files.
242253
"""
243254
self._input_files = input_files
244255
self._representative_header_lines = representative_header_lines
245256
self._allow_malformed_records = allow_malformed_records
246257
self._pre_infer_headers = pre_infer_headers
258+
self._sample_name_encoding = sample_name_encoding
247259

248260
def _read_records(self, (file_path, block)):
249261
# type: (Tuple[str, Block]) -> Iterable(Variant)
@@ -255,7 +267,8 @@ def _read_records(self, (file_path, block)):
255267
self._allow_malformed_records,
256268
representative_header_lines=self._representative_header_lines,
257269
splittable_bgzf=True,
258-
pre_infer_headers=self._pre_infer_headers)
270+
pre_infer_headers=self._pre_infer_headers,
271+
sample_name_encoding=self._sample_name_encoding)
259272

260273
for record in record_iterator:
261274
yield record
@@ -286,6 +299,7 @@ def __init__(
286299
validate=True, # type: bool
287300
allow_malformed_records=False, # type: bool
288301
pre_infer_headers=False, # type: bool
302+
sample_name_encoding=SampleNameEncoding.WITHOUT_FILE_PATH, # type: int
289303
**kwargs # type: **str
290304
):
291305
# type: (...) -> None
@@ -302,6 +316,10 @@ def __init__(
302316
underlying file_path's extension will be used to detect the compression.
303317
validate: flag to verify that the files exist during the pipeline creation
304318
time.
319+
pre_infer_headers: If true, drop headers and make sure PySam return the
320+
exact data for variants and calls, without type matching.
321+
sample_name_encoding: specify how we want to encode sample_name mainly
322+
to deal with same sample_name used across multiple VCF files
305323
"""
306324
super(ReadFromVcf, self).__init__(**kwargs)
307325

@@ -311,20 +329,23 @@ def __init__(
311329
compression_type,
312330
validate=validate,
313331
allow_malformed_records=allow_malformed_records,
314-
pre_infer_headers=pre_infer_headers)
332+
pre_infer_headers=pre_infer_headers,
333+
sample_name_encoding=sample_name_encoding)
315334

316335
def expand(self, pvalue):
317336
return pvalue.pipeline | Read(self._source)
318337

319338

320339
def _create_vcf_source(
321340
file_pattern=None, representative_header_lines=None, compression_type=None,
322-
allow_malformed_records=None, pre_infer_headers=False):
341+
allow_malformed_records=None, pre_infer_headers=False,
342+
sample_name_encoding=SampleNameEncoding.WITHOUT_FILE_PATH):
323343
return _VcfSource(file_pattern=file_pattern,
324344
representative_header_lines=representative_header_lines,
325345
compression_type=compression_type,
326346
allow_malformed_records=allow_malformed_records,
327-
pre_infer_headers=pre_infer_headers)
347+
pre_infer_headers=pre_infer_headers,
348+
sample_name_encoding=sample_name_encoding)
328349

329350

330351
class ReadAllFromVcf(PTransform):
@@ -348,6 +369,7 @@ def __init__(
348369
compression_type=CompressionTypes.AUTO, # type: str
349370
allow_malformed_records=False, # type: bool
350371
pre_infer_headers=False, # type: bool
372+
sample_name_encoding=SampleNameEncoding.WITHOUT_FILE_PATH, # type: int
351373
**kwargs # type: **str
352374
):
353375
# type: (...) -> None
@@ -366,14 +388,19 @@ def __init__(
366388
underlying file_path's extension will be used to detect the compression.
367389
allow_malformed_records: If true, malformed records from VCF files will be
368390
returned as :class:`MalformedVcfRecord` instead of failing the pipeline.
391+
pre_infer_headers: If true, drop headers and make sure PySam return the
392+
exact data for variants and calls, without type matching.
393+
sample_name_encoding: specify how we want to encode sample_name mainly
394+
to deal with same sample_name used across multiple VCF files
369395
"""
370396
super(ReadAllFromVcf, self).__init__(**kwargs)
371397
source_from_file = partial(
372398
_create_vcf_source,
373399
representative_header_lines=representative_header_lines,
374400
compression_type=compression_type,
375401
allow_malformed_records=allow_malformed_records,
376-
pre_infer_headers=pre_infer_headers)
402+
pre_infer_headers=pre_infer_headers,
403+
sample_name_encoding=sample_name_encoding)
377404
self._read_all_files = filebasedsource.ReadAllFiles(
378405
True, # splittable
379406
CompressionTypes.AUTO, desired_bundle_size,

0 commit comments

Comments
 (0)