Skip to content

Commit

Permalink
Fix BigQueryColumnCheckOperator runtime error (#28796)
Browse files Browse the repository at this point in the history
  • Loading branch information
vchiapaikeo committed Jan 9, 2023
1 parent 6ca67ba commit c67f4af
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ def execute(self, context=None):
self.column_mapping[column][check], result, tolerance
)

failed_tests(
failed_tests.extend(
f"Column: {col}\n\tCheck: {check},\n\tCheck Values: {check_values}\n"
for col, checks in self.column_mapping.items()
for check, check_values in checks.items()
Expand Down
63 changes: 63 additions & 0 deletions tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from unittest import mock
from unittest.mock import MagicMock

import pandas as pd
import pytest
from google.cloud.bigquery import DEFAULT_RETRY
from google.cloud.exceptions import Conflict
Expand All @@ -31,6 +32,7 @@
from airflow.models.taskinstance import TaskInstance
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator,
BigQueryColumnCheckOperator,
BigQueryConsoleIndexableLink,
BigQueryConsoleLink,
BigQueryCreateEmptyDatasetOperator,
Expand Down Expand Up @@ -1676,3 +1678,64 @@ def test_bigquery_value_check_empty():
with pytest.raises(AirflowException) as missing_param:
BigQueryValueCheckOperator(deferrable=True, kwargs={})
assert (missing_param.value.args[0] == expected) or (missing_param.value.args[0] == expected1)


@pytest.mark.parametrize(
"check_type, check_value, check_result",
[
("equal_to", 0, 0),
("greater_than", 0, 1),
("less_than", 0, -1),
("geq_to", 0, 1),
("geq_to", 0, 0),
("leq_to", 0, 0),
("leq_to", 0, -1),
],
)
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
def test_bigquery_column_check_operator_succeeds(mock_job, mock_hook, check_type, check_value, check_result):
mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
{"col_name": ["col1"], "check_type": ["min"], "check_result": [check_result]}
)
mock_hook.return_value.insert_job.return_value = mock_job

op = BigQueryColumnCheckOperator(
task_id="check_column_succeeds",
table=TEST_TABLE_ID,
use_legacy_sql=False,
column_mapping={
"col1": {"min": {check_type: check_value}},
},
)
op.execute(create_context(op))


@pytest.mark.parametrize(
"check_type, check_value, check_result",
[
("equal_to", 0, 1),
("greater_than", 0, -1),
("less_than", 0, 1),
("geq_to", 0, -1),
("leq_to", 0, 1),
],
)
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
def test_bigquery_column_check_operator_fails(mock_job, mock_hook, check_type, check_value, check_result):
mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
{"col_name": ["col1"], "check_type": ["min"], "check_result": [1]}
)
mock_hook.return_value.insert_job.return_value = mock_job

op = BigQueryColumnCheckOperator(
task_id="check_column_fails",
table=TEST_TABLE_ID,
use_legacy_sql=False,
column_mapping={
"col1": {"min": {"equal_to": 0}},
},
)
with pytest.raises(AirflowException):
op.execute(create_context(op))

0 comments on commit c67f4af

Please sign in to comment.