@@ -389,7 +389,7 @@ def _run_annotation_pipeline(known_args, pipeline_args):
389389def _create_sample_info_table (pipeline , # type: beam.Pipeline
390390 pipeline_mode , # type: PipelineModes
391391 known_args , # type: argparse.Namespace,
392- pipeline_args , # type: List[ str]
392+ temp_directory , # str
393393 ):
394394 # type: (...) -> None
395395 headers = pipeline_common .read_headers (
@@ -399,8 +399,14 @@ def _create_sample_info_table(pipeline, # type: beam.Pipeline
399399 _ = (headers | 'SampleInfoToBigQuery' >>
400400 sample_info_to_bigquery .SampleInfoToBigQuery (
401401 known_args .output_table ,
402+ << << << < HEAD
402403 SampleNameEncoding [known_args .sample_name_encoding ],
403404 known_args .append ))
405+ == == == =
406+ temp_directory ,
407+ known_args .append ,
408+ known_args .samples_span_multiple_files ))
409+ >> >> > >> Address first iteration of comments .
404410
405411
406412def run (argv = None ):
@@ -409,6 +415,8 @@ def run(argv=None):
409415 logging .info ('Command: %s' , ' ' .join (argv or sys .argv ))
410416 known_args , pipeline_args = pipeline_common .parse_args (argv ,
411417 _COMMAND_LINE_OPTIONS )
418+ if known_args .output_table and '--temp_location' not in pipeline_args :
419+ raise ValueError ('--temp_location is required for BigQuery imports.' )
412420 if known_args .auto_flags_experiment :
413421 _get_input_dimensions (known_args , pipeline_args )
414422
@@ -484,6 +492,7 @@ def run(argv=None):
484492 num_shards = 1
485493
486494 if known_args .output_table :
495+ << << << < HEAD
487496 schema_file = tempfile .mkstemp (prefix = known_args .output_table ,
488497 suffix = _BQ_SCHEMA_FILE_SUFFIX )[1 ]
489498 schema = (
@@ -495,6 +504,13 @@ def run(argv=None):
495504 file_to_write .write (schema_json )
496505
497506 for i in range (num_shards ):
507+ == == == =
508+ temp_directory = pipeline_options .PipelineOptions (pipeline_args ).view_as (
509+ pipeline_options .GoogleCloudOptions ).temp_location
510+ if not temp_directory :
511+ raise ValueError ('--temp_location must be set when writing to BigQuery.' )
512+ for i in range (num_partitions ):
513+ > >> >> >> Address first iteration of comments .
498514 table_suffix = ''
499515 if sharding and sharding .get_shard_name (i ):
500516 table_suffix = '_' + sharding .get_shard_name (i )
@@ -511,7 +527,7 @@ def run(argv=None):
511527 known_args .null_numeric_value_replacement )))
512528 if known_args .generate_sample_info_table :
513529 _create_sample_info_table (
514- pipeline , pipeline_mode , known_args , pipeline_args )
530+ pipeline , pipeline_mode , known_args , temp_directory )
515531
516532 if known_args .output_avro_path :
517533 # TODO(bashir2): Add an integration test that outputs to Avro files and
0 commit comments