Skip to content

Commit 6420e96

Browse files
authored
Add the flag for null replacement #68 (#331)
- Add the flag for null numeric replacement. - Add an integration test for this flag. Issue: 68 Tested: unit tests & integration tests.
1 parent e767449 commit 6420e96

File tree

13 files changed

+352
-211
lines changed

13 files changed

+352
-211
lines changed

docs/bigquery_schema.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,9 @@ In addition, the schema from Variant Transforms has the following properties:
3030
does not allow null values in repeated fields (the entire record can be null,
3131
but values within the record must each have a value). For instance, if a
3232
VCF INFO field is `1,.,2`, we cannot load `1,null,2` to BigQuery and need to
33-
use a numeric replacement for the null value. The replacement value is
34-
currently set to `-2^31` (equal to `-2147483648`).
35-
[Issue #68](https://github.com/googlegenomics/gcp-variant-transforms/issues/68)
36-
tracks the feature to make this value configurable. The alternative is to
37-
convert such values to a string and use `.` to represent the null value.
33+
use a numeric replacement for the null value. By default, the replacement
34+
value is set to `-2^31` (equal to `-2147483648`). You can also use
35+
`--null_numeric_value_replacement` to customize this value. The alternative is
36+
to convert such values to a string and use `.` to represent the null value.
3837
To do this, please change the header to specify the type as `String`.
3938

gcp_variant_transforms/libs/bigquery_row_generator.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818

1919
import copy
2020
import json
21-
from typing import Dict, Any # pylint: disable=unused-import
21+
from typing import Any, Dict # pylint: disable=unused-import
2222

2323
from gcp_variant_transforms.beam_io import vcfio
2424
from gcp_variant_transforms.libs import bigquery_schema_descriptor # pylint: disable=unused-import
25+
from gcp_variant_transforms.libs import bigquery_sanitizer
2526
from gcp_variant_transforms.libs import bigquery_util
2627
from gcp_variant_transforms.libs import processed_variant # pylint: disable=unused-import
2728
from gcp_variant_transforms.libs import vcf_field_conflict_resolver # pylint: disable=unused-import
@@ -35,6 +36,7 @@
3536
# Number of bytes to add to the object size when concatenating calls (i.e.
3637
# to account for ", "). We use 5 bytes to be conservative.
3738
_JSON_CONCATENATION_OVERHEAD_BYTES = 5
39+
_BigQuerySchemaSanitizer = bigquery_sanitizer.SchemaSanitizer
3840

3941

4042
class BigQueryRowGenerator(object):
@@ -45,10 +47,13 @@ def __init__(
4547
schema_descriptor, # type: bigquery_schema_descriptor.SchemaDescriptor
4648
conflict_resolver=None,
4749
# type: vcf_field_conflict_resolver.ConflictResolver
50+
null_numeric_value_replacement=None # type: int
4851
):
4952
# type: (...) -> None
5053
self._schema_descriptor = schema_descriptor
5154
self._conflict_resolver = conflict_resolver
55+
self._bigquery_field_sanitizer = bigquery_sanitizer.FieldSanitizer(
56+
null_numeric_value_replacement)
5257

5358
def get_rows(self,
5459
variant,
@@ -124,7 +129,7 @@ def _get_call_record(
124129
"""
125130
call_record = {
126131
bigquery_util.ColumnKeyConstants.CALLS_NAME:
127-
bigquery_util.get_bigquery_sanitized_field(call.name),
132+
self._bigquery_field_sanitizer.get_sanitized_field(call.name),
128133
bigquery_util.ColumnKeyConstants.CALLS_PHASESET: call.phaseset,
129134
bigquery_util.ColumnKeyConstants.CALLS_GENOTYPE: call.genotype or []
130135
}
@@ -150,21 +155,21 @@ def _get_base_row_from_variant(self, variant, allow_incompatible_records):
150155
} # type: Dict[str, Any]
151156
if variant.names:
152157
row[bigquery_util.ColumnKeyConstants.NAMES] = (
153-
bigquery_util.get_bigquery_sanitized_field(variant.names))
158+
self._bigquery_field_sanitizer.get_sanitized_field(variant.names))
154159
if variant.quality is not None:
155160
row[bigquery_util.ColumnKeyConstants.QUALITY] = variant.quality
156161
if variant.filters:
157162
row[bigquery_util.ColumnKeyConstants.FILTER] = (
158-
bigquery_util.get_bigquery_sanitized_field(variant.filters))
163+
self._bigquery_field_sanitizer.get_sanitized_field(variant.filters))
159164
# Add alternate bases.
160165
row[bigquery_util.ColumnKeyConstants.ALTERNATE_BASES] = []
161166
for alt in variant.alternate_data_list:
162167
alt_record = {bigquery_util.ColumnKeyConstants.ALTERNATE_BASES_ALT:
163168
alt.alternate_bases}
164169
for key, data in alt.info.iteritems():
165-
alt_record[bigquery_util.get_bigquery_sanitized_field_name(key)] = (
170+
alt_record[_BigQuerySchemaSanitizer.get_sanitized_field_name(key)] = (
166171
data if key in alt.annotation_field_names else
167-
bigquery_util.get_bigquery_sanitized_field(data))
172+
self._bigquery_field_sanitizer.get_sanitized_field(data))
168173
row[bigquery_util.ColumnKeyConstants.ALTERNATE_BASES].append(alt_record)
169174
# Add info.
170175
for key, data in variant.non_alt_info.iteritems():
@@ -187,14 +192,15 @@ def _get_bigquery_field_entry(
187192
# type: (...) -> (str, Any)
188193
if data is None:
189194
return None, None
190-
field_name = bigquery_util.get_bigquery_sanitized_field_name(key)
195+
field_name = _BigQuerySchemaSanitizer.get_sanitized_field_name(key)
191196
if not schema_descriptor.has_simple_field(field_name):
192197
raise ValueError('BigQuery schema has no such field: {}.\n'
193198
'This can happen if the field is not defined in '
194199
'the VCF headers, or is not inferred automatically. '
195200
'Retry pipeline with --infer_headers.'
196201
.format(field_name))
197-
sanitized_field_data = bigquery_util.get_bigquery_sanitized_field(data)
202+
sanitized_field_data = self._bigquery_field_sanitizer.get_sanitized_field(
203+
data)
198204
field_schema = schema_descriptor.get_field_descriptor(field_name)
199205
field_data, is_compatible = self._check_and_resolve_schema_compatibility(
200206
field_schema, sanitized_field_data)

gcp_variant_transforms/libs/bigquery_row_generator_test.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from gcp_variant_transforms.beam_io import vcfio
2626
from gcp_variant_transforms.libs import bigquery_schema_descriptor
2727
from gcp_variant_transforms.libs import bigquery_row_generator
28-
from gcp_variant_transforms.libs import bigquery_util
28+
from gcp_variant_transforms.libs import bigquery_sanitizer
2929
from gcp_variant_transforms.libs import processed_variant
3030
from gcp_variant_transforms.libs import vcf_field_conflict_resolver
3131
from gcp_variant_transforms.libs.bigquery_util import ColumnKeyConstants
@@ -281,9 +281,13 @@ def test_null_repeated_fields(self):
281281
ColumnKeyConstants.ALTERNATE_BASES: [],
282282
ColumnKeyConstants.FILTER: ['q10'],
283283
ColumnKeyConstants.CALLS: [],
284-
'IIR': [0, 1, bigquery_util._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT],
284+
'IIR': [0,
285+
1,
286+
bigquery_sanitizer._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT],
285287
'IBR': [True, False, False],
286-
'IFR': [0.1, 0.2, bigquery_util._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT,
288+
'IFR': [0.1,
289+
0.2,
290+
bigquery_sanitizer._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT,
287291
0.4],
288292
'ISR': ['.', 'data1', 'data2']}
289293
self.assertEqual([expected_row],
@@ -329,22 +333,22 @@ def test_nonstandard_float_values(self):
329333
ColumnKeyConstants.END_POSITION: 12,
330334
ColumnKeyConstants.REFERENCE_BASES: 'CT',
331335
ColumnKeyConstants.ALTERNATE_BASES: [
332-
{'IF3': -bigquery_util._INF_FLOAT_VALUE, 'alt': 'A'},
336+
{'IF3': -bigquery_sanitizer._INF_FLOAT_VALUE, 'alt': 'A'},
333337
{'IF3': None, 'alt': 'C'},
334-
{'IF3': bigquery_util._INF_FLOAT_VALUE, 'alt': 'T'},
338+
{'IF3': bigquery_sanitizer._INF_FLOAT_VALUE, 'alt': 'T'},
335339
{'IF3': 1.2, 'alt': 'TC'}
336340
],
337341
ColumnKeyConstants.CALLS: [
338342
{
339343
ColumnKeyConstants.CALLS_NAME: 'Sample1',
340344
ColumnKeyConstants.CALLS_GENOTYPE: [0, 1],
341345
ColumnKeyConstants.CALLS_PHASESET: '*',
342-
'GQ': bigquery_util._INF_FLOAT_VALUE
346+
'GQ': bigquery_sanitizer._INF_FLOAT_VALUE
343347
}
344348
],
345-
'IF': bigquery_util._INF_FLOAT_VALUE,
346-
'IFR': [-bigquery_util._INF_FLOAT_VALUE,
347-
bigquery_util._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT,
349+
'IF': bigquery_sanitizer._INF_FLOAT_VALUE,
350+
'IFR': [-bigquery_sanitizer._INF_FLOAT_VALUE,
351+
bigquery_sanitizer._DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT,
348352
1.2],
349353
'IF2': None
350354
}
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
# Copyright 2018 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Sanitizes BigQuery schema and field according to BigQuery restrictions."""
16+
17+
import math
18+
import re
19+
import sys
20+
from typing import List, Optional # pylint: disable=unused-import
21+
22+
from gcp_variant_transforms.beam_io import vcfio
23+
24+
# Prefix to use when the first character of the field name is not [a-zA-Z]
25+
# as required by BigQuery.
26+
_FALLBACK_FIELD_NAME_PREFIX = 'field_'
27+
28+
# A big number to represent infinite float values. The division by 10 is to
29+
# prevent unintentional overflows when doing subsequent operations.
30+
_INF_FLOAT_VALUE = sys.float_info.max / 10
31+
_DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT = -2 ^ 31
32+
33+
34+
class SchemaSanitizer(object):
35+
"""Class to sanitize BigQuery schema according to BigQuery restrictions."""
36+
37+
@staticmethod
38+
def get_sanitized_string(input_str):
39+
# type: (str) -> unicode
40+
"""Returns a unicode as BigQuery API does not support UTF-8 strings."""
41+
return _decode_utf8_string(input_str)
42+
43+
@staticmethod
44+
def get_sanitized_field_name(field_name):
45+
# type: (str) -> str
46+
"""Returns the sanitized field name according to BigQuery restrictions.
47+
48+
BigQuery field names must follow `[a-zA-Z][a-zA-Z0-9_]*`. This method
49+
converts any unsupported characters to an underscore. Also, if the first
50+
character does not match `[a-zA-Z]`, it prepends
51+
`_FALLBACK_FIELD_NAME_PREFIX` to the name.
52+
53+
Args:
54+
field_name: Name of the field to sanitize.
55+
Returns:
56+
Sanitized field name with unsupported characters replaced with an
57+
underscore. It also prepends the name with `_FALLBACK_FIELD_NAME_PREFIX`
58+
if the first character does not match `[a-zA-Z]`.
59+
"""
60+
assert field_name # field_name must not be empty by this stage.
61+
if not re.match('[a-zA-Z]', field_name[0]):
62+
field_name = _FALLBACK_FIELD_NAME_PREFIX + field_name
63+
return re.sub('[^a-zA-Z0-9_]', '_', field_name)
64+
65+
66+
class FieldSanitizer(object):
67+
"""Class to sanitize field values according to BigQuery restrictions."""
68+
69+
def __init__(self, null_numeric_value_replacement):
70+
# type: (Optional[int]) -> None
71+
"""Initializes a `BigQueryFieldSanitizer`.
72+
73+
Args:
74+
null_numeric_value_replacement: Value to use instead of null for
75+
numeric (float/int/long) lists. For instance, [0, None, 1] will become
76+
[0, `null_numeric_value_replacement`, 1].
77+
"""
78+
self._null_numeric_value_replacement = (
79+
null_numeric_value_replacement or
80+
_DEFAULT_NULL_NUMERIC_VALUE_REPLACEMENT)
81+
82+
def get_sanitized_field(self, field):
83+
# type: (Any) -> Any
84+
"""Returns sanitized field according to BigQuery restrictions.
85+
86+
This method only sanitizes lists and strings. It returns the same `field`
87+
for all other types (including None).
88+
89+
For lists, null values are replaced with reasonable defaults since the
90+
BigQuery API does not allow null values in lists (note that the entire
91+
list is allowed to be null). For instance, [0, None, 1] becomes
92+
[0, `null_numeric_value_replacement`, 1].
93+
Null value replacements are:
94+
- `False` for bool.
95+
- `.` for string (null string values should not exist in Variants parsed
96+
using PyVCF though).
97+
- `null_numeric_value_replacement` for float/int/long.
98+
99+
For strings, it returns its unicode representation. The BigQuery API does
100+
not support strings that are UTF-8 encoded.
101+
102+
Args:
103+
field: Field to sanitize. It can be of any type.
104+
105+
Raises:
106+
ValueError: If the field could not be sanitized (e.g. unsupported types in
107+
lists).
108+
"""
109+
if not field:
110+
return field
111+
if isinstance(field, basestring):
112+
return self._get_sanitized_string(field)
113+
elif isinstance(field, float):
114+
return self._get_sanitized_float(field)
115+
elif isinstance(field, list):
116+
return self._get_sanitized_list(field)
117+
else:
118+
return field
119+
120+
def _get_sanitized_list(self, input_list):
121+
# type: (List) -> List
122+
"""Returns sanitized list according to BigQuery restrictions.
123+
124+
Null values are replaced with reasonable defaults since the
125+
BigQuery API does not allow null values in lists (note that the entire
126+
list is allowed to be null). For instance, [0, None, 1] becomes
127+
[0, `null_numeric_value_replacement`, 1].
128+
Null value replacements are:
129+
- `False` for bool.
130+
- `.` for string (null string values should not exist in Variants parsed
131+
using PyVCF though).
132+
- `null_numeric_value_replacement` for float/int/long.
133+
Lists that contain strings are also sanitized according to the
134+
`_get_sanitized_string` method.
135+
136+
Args:
137+
input_list: List to sanitize.
138+
139+
Raises:
140+
ValueError: If a list contains unsupported values. Supported types are
141+
basestring, bool, int, long, and float.
142+
"""
143+
null_replacement_value = None
144+
for i in input_list:
145+
if i is None:
146+
continue
147+
if isinstance(i, basestring):
148+
null_replacement_value = vcfio.MISSING_FIELD_VALUE
149+
elif isinstance(i, bool):
150+
null_replacement_value = False
151+
elif isinstance(i, (int, long, float)):
152+
null_replacement_value = self._null_numeric_value_replacement
153+
else:
154+
raise ValueError('Unsupported value for input: %s' % str(i))
155+
break # Assumption is that all fields have the same type.
156+
if null_replacement_value is None: # Implies everything was None.
157+
return []
158+
sanitized_list = []
159+
for i in input_list:
160+
if i is None:
161+
i = null_replacement_value
162+
elif isinstance(i, basestring):
163+
i = self._get_sanitized_string(i)
164+
elif isinstance(i, float):
165+
sanitized_float = self._get_sanitized_float(i)
166+
i = (sanitized_float if sanitized_float is not None
167+
else null_replacement_value)
168+
sanitized_list.append(i)
169+
return sanitized_list
170+
171+
def _get_sanitized_float(self, input_float):
172+
"""Returns a sanitized float for BigQuery.
173+
174+
This method replaces INF and -INF with positive and negative numbers with
175+
huge absolute values, and replaces NaN with None. It returns the same value
176+
for all other values.
177+
"""
178+
if input_float == float('inf'):
179+
return _INF_FLOAT_VALUE
180+
elif input_float == float('-inf'):
181+
return -_INF_FLOAT_VALUE
182+
elif math.isnan(input_float):
183+
return None
184+
else:
185+
return input_float
186+
187+
def _get_sanitized_string(self, input_str):
188+
# type: (str) -> unicode
189+
"""Returns a unicode as BigQuery API does not support UTF-8 strings."""
190+
return _decode_utf8_string(input_str)
191+
192+
193+
def _decode_utf8_string(input_str):
194+
# type: (str) -> unicode
195+
try:
196+
return (input_str if isinstance(input_str, unicode)
197+
else input_str.decode('utf-8'))
198+
except UnicodeDecodeError:
199+
raise ValueError('input_str is not UTF-8: %s ' % (input_str))

0 commit comments

Comments
 (0)