Skip to content

Commit

Permalink
[AIRFLOW-10672] Refactor BigQueryToGCSOperator to use new method (#10773
Browse files Browse the repository at this point in the history
)

Makes BigQueryToGCSOperator to use BigQueryHook.insert_job method

Committer: Mateusz Kukieła <[email protected]>
  • Loading branch information
MateuszKukiela committed Sep 7, 2020
1 parent c8ee455 commit f14f379
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 21 deletions.
38 changes: 26 additions & 12 deletions airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
This module contains Google BigQuery to Google Cloud Storage operator.
"""
import warnings
from typing import Dict, List, Optional, Sequence, Union
from typing import Any, Dict, List, Optional, Sequence, Union

from google.cloud.bigquery.table import TableReference

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
Expand Down Expand Up @@ -139,14 +141,26 @@ def execute(self, context):
location=self.location,
impersonation_chain=self.impersonation_chain,
)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_extract(
source_project_dataset_table=self.source_project_dataset_table,
destination_cloud_storage_uris=self.destination_cloud_storage_uris,
compression=self.compression,
export_format=self.export_format,
field_delimiter=self.field_delimiter,
print_header=self.print_header,
labels=self.labels,
)

table_ref = TableReference.from_string(self.source_project_dataset_table, hook.project_id)

configuration: Dict[str, Any] = {
'extract': {
'sourceTable': table_ref.to_api_repr(),
'compression': self.compression,
'destinationUris': self.destination_cloud_storage_uris,
'destinationFormat': self.export_format,
}
}

if self.labels:
configuration['labels'] = self.labels

if self.export_format == 'CSV':
# Only set fieldDelimiter and printHeader fields if using CSV.
# Google does not like it if you set these fields for other export
# formats.
configuration['extract']['fieldDelimiter'] = self.field_delimiter
configuration['extract']['printHeader'] = self.print_header

hook.insert_job(configuration=configuration)
30 changes: 21 additions & 9 deletions tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
TASK_ID = 'test-bq-create-table-operator'
TEST_DATASET = 'test-dataset'
TEST_TABLE_ID = 'test-table-id'
PROJECT_ID = 'test-project-id'


class TestBigQueryToCloudStorageOperator(unittest.TestCase):
Expand All @@ -38,6 +39,24 @@ def test_execute(self, mock_hook):
print_header = True
labels = {'k1': 'v1'}

mock_hook().project_id = PROJECT_ID

configuration = {
'extract': {
'sourceTable': {
'projectId': mock_hook().project_id,
'datasetId': TEST_DATASET,
'tableId': TEST_TABLE_ID,
},
'compression': compression,
'destinationUris': destination_cloud_storage_uris,
'destinationFormat': export_format,
'fieldDelimiter': field_delimiter,
'printHeader': print_header,
},
'labels': labels,
}

operator = BigQueryToGCSOperator(
task_id=TASK_ID,
source_project_dataset_table=source_project_dataset_table,
Expand All @@ -50,12 +69,5 @@ def test_execute(self, mock_hook):
)

operator.execute(None)
mock_hook.return_value.get_conn.return_value.cursor.return_value.run_extract.assert_called_once_with(
source_project_dataset_table=source_project_dataset_table,
destination_cloud_storage_uris=destination_cloud_storage_uris,
compression=compression,
export_format=export_format,
field_delimiter=field_delimiter,
print_header=print_header,
labels=labels,
)

mock_hook.return_value.insert_job.assert_called_once_with(configuration=configuration)

0 comments on commit f14f379

Please sign in to comment.