Skip to content

Commit ba98509

Browse files
authored
Introduce PySam as an alternative parsing library. (#491)
1 parent 8049c19 commit ba98509

19 files changed

+1491
-153
lines changed

docker/Dockerfile

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,20 @@ ADD / /opt/gcp_variant_transforms/src/
3232
# Needed for installing mmh3 (one of the required packages in setup.py).
3333
RUN apt install -y g++
3434

35+
# Install Pysam dependencies. These dependencies are only required becuase we
36+
# have a monolithic binary - they primarily have to be installed on the workers.
37+
RUN apt-get update && apt-get install -y \
38+
autoconf \
39+
automake \
40+
gcc \
41+
libbz2-dev \
42+
libcurl4-openssl-dev \
43+
liblzma-dev \
44+
libssl-dev \
45+
make \
46+
perl \
47+
zlib1g-dev
48+
3549
# Install dependencies.
3650
RUN python -m pip install --upgrade pip && \
3751
python -m pip install --upgrade virtualenv && \

gcp_variant_transforms/beam_io/vcf_header_io.py

Lines changed: 170 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
from __future__ import absolute_import
1818

19-
from collections import OrderedDict
19+
import collections
2020
from functools import partial
2121
from typing import Dict, Iterable, List # pylint: disable=unused-import
22+
from pysam import libcbcf
2223
import vcf
2324

25+
2426
import apache_beam as beam
2527
from apache_beam.io import filebasedsource
2628
from apache_beam.io import range_trackers # pylint: disable=unused-import
@@ -32,6 +34,8 @@
3234
from gcp_variant_transforms.beam_io import bgzf
3335
from gcp_variant_transforms.beam_io import vcfio
3436

37+
LAST_HEADER_LINE_PREFIX = '#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO'
38+
3539

3640
class VcfHeaderFieldTypeConstants(object):
3741
"""Constants for types from VCF header."""
@@ -52,18 +56,34 @@ class VcfParserHeaderKeyConstants(object):
5256
VERSION = 'version'
5357
LENGTH = 'length'
5458

59+
VcfHeaderInfoField = collections.namedtuple(
60+
'Info', ['id', 'num', 'type', 'desc', 'source', 'version'])
61+
62+
VcfHeaderFormatField = collections.namedtuple(
63+
'Format', ['id', 'num', 'type', 'desc'])
64+
65+
VCF_HEADER_INFO_NUM_FIELD_CONVERSION = {
66+
None: '.',
67+
-1: 'A', # PyVCF value when number of alternate alleles dictates the count
68+
-2: 'G', # PyVCF value when number of genotypes dictates the count
69+
-3: 'R', # PyVCF value when number of alleles and ref dictates the count
70+
'A': 'A', # PySam value when number of alternate alleles dictates the count
71+
'G': 'G', # PySam value when number of genotypes dictates the count
72+
'R': 'R', # PySam value when number of alleles and ref dictates the count
73+
}
5574

5675
class VcfHeader(object):
5776
"""Container for header data."""
5877

5978
def __init__(self,
60-
infos=None, # type: OrderedDict[str, vcf.parser._Info]
61-
filters=None, # type: OrderedDict[str, vcf.parser._Filter]
62-
alts=None, # type: OrderedDict[str, vcf.parser._Alt]
63-
formats=None, # type: OrderedDict[str, vcf.parser._Format]
64-
contigs=None, # type: OrderedDict[str, vcf.parser._Contig]
65-
samples=None, # type: List[str]
66-
file_path=None # type: str
79+
infos=None, # type: Any
80+
filters=None, # type: Any
81+
alts=None, # type: Any
82+
formats=None, # type: Any
83+
contigs=None, # type: Any
84+
samples=None, # type: Any
85+
file_path=None, # type: str
86+
vcf_parser=vcfio.VcfParserType.PYVCF # type: vcfio.VcfParserType
6787
):
6888
# type: (...) -> None
6989
"""Initializes a VcfHeader object.
@@ -81,13 +101,21 @@ def __init__(self,
81101
samples: A list of sample names.
82102
file_path: The full file path of the vcf file.
83103
"""
84-
# type: OrderedDict[str, OrderedDict]
85-
self.infos = self._values_asdict(infos or {})
86-
self.filters = self._values_asdict(filters or {})
87-
self.alts = self._values_asdict(alts or {})
88-
self.formats = self._values_asdict(formats or {})
89-
self.contigs = self._values_asdict(contigs or {})
90-
self.samples = samples
104+
# type: collections.OrderedDict[str, collections.OrderedDict]
105+
if vcf_parser == vcfio.VcfParserType.PYSAM:
106+
self.infos = self._get_infos_pysam(infos)
107+
self.filters = self._get_filters_pysam(filters)
108+
self.alts = self._get_alts_pysam(alts)
109+
self.formats = self._get_formats_pysam(formats)
110+
self.contigs = self._get_contigs_pysam(contigs)
111+
self.samples = self._get_samples_pysam(samples)
112+
else:
113+
self.infos = self._values_asdict(infos or {})
114+
self.filters = self._values_asdict(filters or {})
115+
self.alts = self._values_asdict(alts or {})
116+
self.formats = self._values_asdict(formats or {})
117+
self.contigs = self._values_asdict(contigs or {})
118+
self.samples = samples
91119
self.file_path = file_path
92120

93121
def __eq__(self, other):
@@ -106,14 +134,75 @@ def __repr__(self):
106134

107135
def _values_asdict(self, header):
108136
"""Converts PyVCF header values to ordered dictionaries."""
109-
ordered_dict = OrderedDict()
137+
ordered_dict = collections.OrderedDict()
110138
for key in header:
111139
# These methods were not designed to be protected. They start with an
112140
# underscore to avoid conflicts with field names. For more info, see
113141
# https://docs.python.org/2/library/collections.html#collections.namedtuple
114142
ordered_dict[key] = header[key]._asdict() # pylint: disable=W0212
115143
return ordered_dict
116144

145+
def _get_infos_pysam(self, infos):
146+
results = collections.OrderedDict()
147+
for item in infos.items():
148+
result = collections.OrderedDict()
149+
result['id'] = item[0]
150+
result['num'] = item[1].number if item[1].number != '.' else None
151+
result['type'] = item[1].type
152+
result['desc'] = item[1].description
153+
# Pysam doesn't return these fields in info
154+
result['source'] = None
155+
result['version'] = None
156+
results[item[0]] = result
157+
return dict(results.items())
158+
159+
def _get_filters_pysam(self, filters):
160+
results = collections.OrderedDict()
161+
for item in filters.items():
162+
result = collections.OrderedDict()
163+
result['id'] = item[0]
164+
result['desc'] = item[1].description
165+
results[item[0]] = result
166+
# PySAM adds default PASS value to its filters
167+
del results['PASS']
168+
return dict(results.items())
169+
170+
def _get_alts_pysam(self, alts):
171+
results = collections.OrderedDict()
172+
for item in alts.items():
173+
result = collections.OrderedDict()
174+
result['id'] = item[0]
175+
result['desc'] = item[1]['Description'].strip("\"")
176+
results[item[0]] = result
177+
return dict(results.items())
178+
179+
def _get_formats_pysam(self, formats):
180+
results = collections.OrderedDict()
181+
for item in formats.items():
182+
result = collections.OrderedDict()
183+
result['id'] = item[0]
184+
result['num'] = item[1].number if item[1].number != '.' else None
185+
result['type'] = item[1].type
186+
result['desc'] = item[1].description
187+
results[item[0]] = result
188+
return dict(results.items())
189+
190+
def _get_contigs_pysam(self, contigs):
191+
results = collections.OrderedDict()
192+
for item in contigs.items():
193+
result = collections.OrderedDict()
194+
result['id'] = item[0]
195+
result['length'] = item[1].length
196+
results[item[0]] = result
197+
return dict(results.items())
198+
199+
def _get_samples_pysam(self, sample_line):
200+
sample_tags = sample_line.split('\t')
201+
if len(sample_tags) > 9:
202+
return sample_tags[9:]
203+
else:
204+
return []
205+
117206

118207
class VcfHeaderSource(filebasedsource.FileBasedSource):
119208
"""A source for reading VCF file headers.
@@ -124,15 +213,28 @@ class VcfHeaderSource(filebasedsource.FileBasedSource):
124213
def __init__(self,
125214
file_pattern,
126215
compression_type=CompressionTypes.AUTO,
127-
validate=True):
128-
# type: (str, str, bool) -> None
216+
validate=True,
217+
vcf_parser=vcfio.VcfParserType.PYVCF):
218+
# type: (str, str, bool, vcfio.VcfParserType) -> None
129219
super(VcfHeaderSource, self).__init__(file_pattern,
130220
compression_type=compression_type,
131221
validate=validate,
132222
splittable=False)
133223
self._compression_type = compression_type
224+
self._vcf_parser = vcf_parser
134225

135226
def read_records(
227+
self,
228+
file_path, # type: str
229+
unused_range_tracker, # type: range_trackers.UnsplittableRangeTracker
230+
):
231+
# type: (...) -> Iterable[VcfHeader]
232+
if self._vcf_parser == vcfio.VcfParserType.PYSAM:
233+
return self._read_records_pysam(file_path, unused_range_tracker)
234+
else:
235+
return self._read_records_pyvcf(file_path, unused_range_tracker)
236+
237+
def _read_records_pyvcf(
136238
self,
137239
file_path, # type: str
138240
unused_range_tracker # type: range_trackers.UnsplittableRangeTracker
@@ -142,26 +244,54 @@ def read_records(
142244
vcf_reader = vcf.Reader(fsock=self._read_headers(file_path))
143245
except StopIteration:
144246
raise ValueError('{} has no header.'.format(file_path))
145-
146247
yield VcfHeader(infos=vcf_reader.infos,
147248
filters=vcf_reader.filters,
148249
alts=vcf_reader.alts,
149250
formats=vcf_reader.formats,
150251
contigs=vcf_reader.contigs,
151252
samples=vcf_reader.samples,
152-
file_path=file_path)
253+
file_path=file_path,
254+
vcf_parser=vcfio.VcfParserType.PYVCF)
153255

154256
def _read_headers(self, file_path):
155257
with self.open_file(file_path) as file_to_read:
156258
while True:
157259
record = file_to_read.readline()
158-
while not record or not record.strip(): # Skip empty lines.
260+
while record and not record.strip(): # Skip empty lines.
159261
record = file_to_read.readline()
160262
if record and record.startswith('#'):
161-
yield record
263+
yield record.strip()
162264
else:
163265
break
164266

267+
def _read_records_pysam(
268+
self,
269+
file_path, # type: str
270+
unused_range_tracker # type: range_trackers.UnsplittableRangeTracker
271+
):
272+
# type: (...) -> Iterable[VcfHeader]
273+
header = libcbcf.VariantHeader()
274+
lines = self._read_headers(file_path)
275+
sample_line = LAST_HEADER_LINE_PREFIX
276+
header.add_line('##fileformat=VCFv4.0')
277+
for line in lines:
278+
if line.startswith('#'):
279+
if line.startswith(LAST_HEADER_LINE_PREFIX):
280+
sample_line = line.strip()
281+
break
282+
else:
283+
header.add_line(line.strip())
284+
else:
285+
break
286+
yield VcfHeader(infos=header.info,
287+
filters=header.filters,
288+
alts=header.alts,
289+
formats=header.formats,
290+
contigs=header.contigs,
291+
samples=sample_line,
292+
file_path=file_path,
293+
vcf_parser=vcfio.VcfParserType.PYSAM)
294+
165295
def open_file(self, file_path):
166296
if self._compression_type == CompressionTypes.GZIP:
167297
return bgzf.open_bgzf(file_path)
@@ -181,6 +311,7 @@ def __init__(
181311
file_pattern, # type: str
182312
compression_type=CompressionTypes.AUTO, # type: str
183313
validate=True, # type: bool
314+
vcf_parser=vcfio.VcfParserType.PYVCF, # type: vcfio.VcfParserType
184315
**kwargs # type: **str
185316
):
186317
# type: (...) -> None
@@ -198,15 +329,22 @@ def __init__(
198329
"""
199330
super(ReadVcfHeaders, self).__init__(**kwargs)
200331
self._source = VcfHeaderSource(
201-
file_pattern, compression_type, validate=validate)
332+
file_pattern,
333+
compression_type,
334+
validate=validate,
335+
vcf_parser=vcf_parser)
202336

203337
def expand(self, pvalue):
204338
return pvalue.pipeline | Read(self._source)
205339

206340

207-
def _create_vcf_header_source(file_pattern=None, compression_type=None):
341+
def _create_vcf_header_source(
342+
file_pattern=None,
343+
compression_type=None,
344+
vcf_parser=vcfio.VcfParserType.PYVCF):
208345
return VcfHeaderSource(file_pattern=file_pattern,
209-
compression_type=compression_type)
346+
compression_type=compression_type,
347+
vcf_parser=vcf_parser)
210348

211349

212350
class ReadAllVcfHeaders(PTransform):
@@ -226,8 +364,9 @@ def __init__(
226364
self,
227365
desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE,
228366
compression_type=CompressionTypes.AUTO,
367+
vcf_parser=vcfio.VcfParserType.PYVCF,
229368
**kwargs):
230-
# type: (int, str, **str) -> None
369+
# type: (int, str, **str, vcfio.VcfParserType) -> None
231370
"""Initialize the :class:`ReadAllVcfHeaders` transform.
232371
233372
Args:
@@ -242,7 +381,9 @@ def __init__(
242381
"""
243382
super(ReadAllVcfHeaders, self).__init__(**kwargs)
244383
source_from_file = partial(
245-
_create_vcf_header_source, compression_type=compression_type)
384+
_create_vcf_header_source,
385+
compression_type=compression_type,
386+
vcf_parser=vcf_parser)
246387
self._read_all_files = filebasedsource.ReadAllFiles(
247388
False, # splittable (we are just reading the headers)
248389
CompressionTypes.AUTO, desired_bundle_size,
@@ -406,9 +547,8 @@ def _format_number(self, number):
406547
return None
407548
elif number >= 0:
408549
return str(number)
409-
number_to_string = {v: k for k, v in vcf.parser.field_counts.items()}
410-
if number in number_to_string:
411-
return number_to_string[number]
550+
if number in VCF_HEADER_INFO_NUM_FIELD_CONVERSION:
551+
return VCF_HEADER_INFO_NUM_FIELD_CONVERSION[number]
412552
else:
413553
raise ValueError('Invalid value for number: {}'.format(number))
414554

0 commit comments

Comments
 (0)