Skip to content

Commit bc0063a

Browse files
Fix to read location parameter properly in BigQueryToBigQueryOperator (#27661)
1 parent 24903eb commit bc0063a

File tree

3 files changed

+43
-5
lines changed

3 files changed

+43
-5
lines changed

airflow/providers/google/cloud/transfers/bigquery_to_bigquery.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ class BigQueryToBigQueryOperator(BaseOperator):
5858
encryption_configuration = {
5959
"kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key"
6060
}
61-
:param location: The location used for the operation.
61+
:param location: The geographic location of the job. You must specify the location to run the job if
62+
the location to run a job is not in the US or the EU multi-regional location or
63+
the location is in a single region (for example, us-central1).
64+
For more details check:
65+
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
6266
:param impersonation_chain: Optional service account to impersonate using short-term
6367
credentials, or chained list of accounts required to get the access_token
6468
of the last account in the list, which will be impersonated in the request.
@@ -131,7 +135,7 @@ def execute(self, context: Context) -> None:
131135
encryption_configuration=self.encryption_configuration,
132136
)
133137

134-
job = hook.get_job(job_id=job_id).to_api_repr()
138+
job = hook.get_job(job_id=job_id, location=self.location).to_api_repr()
135139
conf = job["configuration"]["copy"]["destinationTable"]
136140
BigQueryTableLink.persist(
137141
context=context,

tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@
2222

2323
from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator
2424

25+
BQ_HOOK_PATH = "airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryHook"
2526
TASK_ID = "test-bq-create-table-operator"
2627
TEST_DATASET = "test-dataset"
2728
TEST_TABLE_ID = "test-table-id"
2829

2930

3031
class TestBigQueryToBigQueryOperator(unittest.TestCase):
31-
@mock.patch("airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryHook")
32-
def test_execute(self, mock_hook):
32+
@mock.patch(BQ_HOOK_PATH)
33+
def test_execute_without_location_should_execute_successfully(self, mock_hook):
3334
source_project_dataset_tables = f"{TEST_DATASET}.{TEST_TABLE_ID}"
3435
destination_project_dataset_table = f"{TEST_DATASET + '_new'}.{TEST_TABLE_ID}"
3536
write_disposition = "WRITE_EMPTY"
@@ -56,3 +57,31 @@ def test_execute(self, mock_hook):
5657
labels=labels,
5758
encryption_configuration=encryption_configuration,
5859
)
60+
61+
@mock.patch(BQ_HOOK_PATH)
62+
def test_execute_single_regional_location_should_execute_successfully(self, mock_hook):
63+
source_project_dataset_tables = f"{TEST_DATASET}.{TEST_TABLE_ID}"
64+
destination_project_dataset_table = f"{TEST_DATASET + '_new'}.{TEST_TABLE_ID}"
65+
write_disposition = "WRITE_EMPTY"
66+
create_disposition = "CREATE_IF_NEEDED"
67+
labels = {"k1": "v1"}
68+
location = "us-central1"
69+
encryption_configuration = {"key": "kk"}
70+
mock_hook.return_value.run_copy.return_value = "job-id"
71+
72+
operator = BigQueryToBigQueryOperator(
73+
task_id=TASK_ID,
74+
source_project_dataset_tables=source_project_dataset_tables,
75+
destination_project_dataset_table=destination_project_dataset_table,
76+
write_disposition=write_disposition,
77+
create_disposition=create_disposition,
78+
labels=labels,
79+
encryption_configuration=encryption_configuration,
80+
location=location,
81+
)
82+
83+
operator.execute(context=mock.MagicMock())
84+
mock_hook.return_value.get_job.assert_called_once_with(
85+
job_id="job-id",
86+
location=location,
87+
)

tests/system/providers/google/cloud/bigquery/example_bigquery_to_bigquery.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
3939
ORIGIN = "origin"
4040
TARGET = "target"
41+
LOCATION = "US"
4142

4243

4344
with models.DAG(
@@ -47,7 +48,11 @@
4748
catchup=False,
4849
tags=["example", "bigquery"],
4950
) as dag:
50-
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
51+
create_dataset = BigQueryCreateEmptyDatasetOperator(
52+
task_id="create_dataset",
53+
dataset_id=DATASET_NAME,
54+
location=LOCATION,
55+
)
5156

5257
create_origin_table = BigQueryCreateEmptyTableOperator(
5358
task_id="create_origin_table",

0 commit comments

Comments
 (0)