Skip to content

Commit

Permalink
Fix BIGQUERY_JOB_DETAILS_LINK_FMT in BigQueryConsoleLink (#31953)
Browse files Browse the repository at this point in the history
Co-authored-by: Beata Kossakowska <[email protected]>
  • Loading branch information
bkossakowska and Beata Kossakowska committed Jun 20, 2023
1 parent 79bcc2e commit 2a79fb7
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 27 deletions.
19 changes: 13 additions & 6 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
BigQueryIntervalCheckTrigger,
BigQueryValueCheckTrigger,
)
from airflow.providers.google.cloud.utils.bigquery import convert_job_id

if TYPE_CHECKING:
from google.cloud.bigquery import UnknownJob
Expand Down Expand Up @@ -90,8 +91,8 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_id = XCom.get_value(key="job_id", ti_key=ti_key)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
job_id_path = XCom.get_value(key="job_id_path", ti_key=ti_key)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id_path) if job_id_path else ""


@attr.s(auto_attribs=True)
Expand All @@ -110,7 +111,7 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
job_ids = XCom.get_value(key="job_id_path", ti_key=ti_key)
if not job_ids:
return None
if len(job_ids) < self.index:
Expand Down Expand Up @@ -1184,7 +1185,11 @@ def execute(self, context: Context):
]
else:
raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable")
context["task_instance"].xcom_push(key="job_id", value=job_id)
project_id = self.hook.project_id
if project_id:
job_id_path = convert_job_id(job_id=job_id, project_id=project_id, location=self.location)
context["task_instance"].xcom_push(key="job_id_path", value=job_id_path)
return job_id

def on_kill(self) -> None:
super().on_kill()
Expand Down Expand Up @@ -2727,9 +2732,11 @@ def execute(self, context: Any):
persist_kwargs["dataset_id"] = table["datasetId"]
persist_kwargs["project_id"] = table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)

self.job_id = job.job_id
context["ti"].xcom_push(key="job_id", value=self.job_id)
project_id = self.project_id or self.hook.project_id
if project_id:
job_id_path = convert_job_id(job_id=job_id, project_id=project_id, location=self.location)
context["ti"].xcom_push(key="job_id_path", value=job_id_path)
# Wait for the job to complete
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
Expand Down
17 changes: 17 additions & 0 deletions airflow/providers/google/cloud/utils/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations

from typing import Any


def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | str:
"""
Expand All @@ -34,3 +36,18 @@ def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | str:
return string_field == "true"
else:
return string_field


def convert_job_id(job_id: str | list[str], project_id: str, location: str | None) -> Any:
"""
Helper method that converts to path: project_id:location:job_id
:param project_id: Required. The ID of the Google Cloud project where workspace located.
:param location: Optional. The ID of the Google Cloud region where workspace located.
:param job_id: Required. The ID of the job.
:return: str or list[str] of project_id:location:job_id.
"""
location = location if location else "US"
if isinstance(job_id, list):
return [f"{project_id}:{location}:{i}" for i in job_id]
else:
return f"{project_id}:{location}:{job_id}"
4 changes: 2 additions & 2 deletions tests/api_connexion/endpoints/test_extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def test_should_raise_403_forbidden(self):
@mock_plugin_manager(plugins=[])
def test_should_respond_200(self):
XCom.set(
key="job_id",
key="job_id_path",
value="TEST_JOB_ID",
task_id="TEST_SINGLE_QUERY",
dag_id=self.dag.dag_id,
Expand Down Expand Up @@ -171,7 +171,7 @@ def test_should_respond_200_missing_xcom(self):
@mock_plugin_manager(plugins=[])
def test_should_respond_200_multiple_links(self):
XCom.set(
key="job_id",
key="job_id_path",
value=["TEST_JOB_ID_1", "TEST_JOB_ID_2"],
task_id="TEST_MULTIPLE_QUERY",
dag_id=self.dag.dag_id,
Expand Down
46 changes: 27 additions & 19 deletions tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
}
TEST_TABLE = "test-table"
GCP_CONN_ID = "google_cloud_default"
TEST_JOB_ID_1 = "test-job-id"
TEST_JOB_ID_2 = "test-123"
TEST_FULL_JOB_ID = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
TEST_FULL_JOB_ID_2 = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"


class TestBigQueryCreateEmptyTableOperator:
Expand Down Expand Up @@ -673,10 +677,10 @@ def test_bigquery_operator_extra_serialized_field_when_single_query(
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleLink)

ti.xcom_push("job_id", 12345)
ti.xcom_push("job_id_path", TEST_FULL_JOB_ID)

url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
assert url == "https://console.cloud.google.com/bigquery?j=12345"
assert url == f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"

@pytest.mark.need_serialized_dag
def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
Expand Down Expand Up @@ -711,17 +715,18 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleIndexableLink)

job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)
ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID, TEST_FULL_JOB_ID_2])

assert {"BigQuery Console #1", "BigQuery Console #2"} == simple_task.operator_extra_link_dict.keys()

assert "https://console.cloud.google.com/bigquery?j=123" == simple_task.get_extra_links(
ti, "BigQuery Console #1"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== simple_task.get_extra_links(ti, "BigQuery Console #1")
)

assert "https://console.cloud.google.com/bigquery?j=45" == simple_task.get_extra_links(
ti, "BigQuery Console #2"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== simple_task.get_extra_links(ti, "BigQuery Console #2")
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -740,7 +745,9 @@ def test_bigquery_operator_extra_link_when_missing_job_id(

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_operator_extra_link_when_single_query(
self, mock_hook, create_task_instance_of_operator
self,
mock_hook,
create_task_instance_of_operator,
):
ti = create_task_instance_of_operator(
BigQueryExecuteQueryOperator,
Expand All @@ -751,11 +758,11 @@ def test_bigquery_operator_extra_link_when_single_query(
)
bigquery_task = ti.task

job_id = "12345"
ti.xcom_push(key="job_id", value=job_id)
ti.xcom_push(key="job_id_path", value=TEST_FULL_JOB_ID)

assert f"https://console.cloud.google.com/bigquery?j={job_id}" == bigquery_task.get_extra_links(
ti, BigQueryConsoleLink.name
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -771,17 +778,18 @@ def test_bigquery_operator_extra_link_when_multiple_query(
)
bigquery_task = ti.task

job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)
ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID, TEST_FULL_JOB_ID_2])

assert {"BigQuery Console #1", "BigQuery Console #2"} == bigquery_task.operator_extra_link_dict.keys()

assert "https://console.cloud.google.com/bigquery?j=123" == bigquery_task.get_extra_links(
ti, "BigQuery Console #1"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #1")
)

assert "https://console.cloud.google.com/bigquery?j=45" == bigquery_task.get_extra_links(
ti, "BigQuery Console #2"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #2")
)


Expand Down

0 comments on commit 2a79fb7

Please sign in to comment.