Skip to content

Commit

Permalink
Fix to read location parameter properly in BigQueryToBigQueryOperator (
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova committed Nov 16, 2022
1 parent 24903eb commit bc0063a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ class BigQueryToBigQueryOperator(BaseOperator):
encryption_configuration = {
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
}
:param location: The location used for the operation.
:param location: The geographic location of the job. You must specify the location to run the job if
the location to run a job is not in the US or the EU multi-regional location or
the location is in a single region (for example, us-central1).
For more details check:
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
Expand Down Expand Up @@ -131,7 +135,7 @@ def execute(self, context: Context) -> None:
encryption_configuration=self.encryption_configuration,
)

job = hook.get_job(job_id=job_id).to_api_repr()
job = hook.get_job(job_id=job_id, location=self.location).to_api_repr()
conf = job["configuration"]["copy"]["destinationTable"]
BigQueryTableLink.persist(
context=context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@

from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator

BQ_HOOK_PATH = "airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryHook"
TASK_ID = "test-bq-create-table-operator"
TEST_DATASET = "test-dataset"
TEST_TABLE_ID = "test-table-id"


class TestBigQueryToBigQueryOperator(unittest.TestCase):
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryHook")
def test_execute(self, mock_hook):
@mock.patch(BQ_HOOK_PATH)
def test_execute_without_location_should_execute_successfully(self, mock_hook):
source_project_dataset_tables = f"{TEST_DATASET}.{TEST_TABLE_ID}"
destination_project_dataset_table = f"{TEST_DATASET + '_new'}.{TEST_TABLE_ID}"
write_disposition = "WRITE_EMPTY"
Expand All @@ -56,3 +57,31 @@ def test_execute(self, mock_hook):
labels=labels,
encryption_configuration=encryption_configuration,
)

@mock.patch(BQ_HOOK_PATH)
def test_execute_single_regional_location_should_execute_successfully(self, mock_hook):
source_project_dataset_tables = f"{TEST_DATASET}.{TEST_TABLE_ID}"
destination_project_dataset_table = f"{TEST_DATASET + '_new'}.{TEST_TABLE_ID}"
write_disposition = "WRITE_EMPTY"
create_disposition = "CREATE_IF_NEEDED"
labels = {"k1": "v1"}
location = "us-central1"
encryption_configuration = {"key": "kk"}
mock_hook.return_value.run_copy.return_value = "job-id"

operator = BigQueryToBigQueryOperator(
task_id=TASK_ID,
source_project_dataset_tables=source_project_dataset_tables,
destination_project_dataset_table=destination_project_dataset_table,
write_disposition=write_disposition,
create_disposition=create_disposition,
labels=labels,
encryption_configuration=encryption_configuration,
location=location,
)

operator.execute(context=mock.MagicMock())
mock_hook.return_value.get_job.assert_called_once_with(
job_id="job-id",
location=location,
)
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
ORIGIN = "origin"
TARGET = "target"
LOCATION = "US"


with models.DAG(
Expand All @@ -47,7 +48,11 @@
catchup=False,
tags=["example", "bigquery"],
) as dag:
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset",
dataset_id=DATASET_NAME,
location=LOCATION,
)

create_origin_table = BigQueryCreateEmptyTableOperator(
task_id="create_origin_table",
Expand Down

0 comments on commit bc0063a

Please sign in to comment.