Skip to content

Commit

Permalink
Implement Google BigQuery Table Partition Sensor (#10218)
Browse files Browse the repository at this point in the history
  • Loading branch information
thejens committed Aug 13, 2020
1 parent a74a7da commit 2f0613b
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG for Google BigQuery Sensors.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator,
BigQueryExecuteQueryOperator,
)
from airflow.providers.google.cloud.sensors.bigquery import (
BigQueryTableExistenceSensor, BigQueryTablePartitionExistenceSensor,
)
from airflow.utils.dates import days_ago

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_sensors_dataset")

TABLE_NAME = "partitioned_table"
INSERT_DATE = datetime.now().strftime("%Y-%m-%d")

PARTITION_NAME = "{{ ds_nodash }}"

INSERT_ROWS_QUERY = \
f"INSERT {DATASET_NAME}.{TABLE_NAME} VALUES " \
"(42, '{{ ds }}')"

SCHEMA = [
{"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
{"name": "ds", "type": "DATE", "mode": "NULLABLE"},
]

dag_id = "example_bigquery_sensors"

with models.DAG(
dag_id,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME},
default_args={"project_id": PROJECT_ID}
) as dag_with_locations:
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create-dataset", dataset_id=DATASET_NAME, project_id=PROJECT_ID
)

create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
schema_fields=SCHEMA,
time_partitioning={
"type": "DAY",
"field": "ds",
}
)
# [START howto_sensor_bigquery_table]
check_table_exists = BigQueryTableExistenceSensor(
task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)
# [END howto_sensor_bigquery_table]

execute_insert_query = BigQueryExecuteQueryOperator(
task_id="execute_insert_query", sql=INSERT_ROWS_QUERY, use_legacy_sql=False
)

# [START howto_sensor_bigquery_table_partition]
check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME,
table_id=TABLE_NAME, partition_id=PARTITION_NAME
)
# [END howto_sensor_bigquery_table_partition]

delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)

create_dataset >> create_table
create_table >> check_table_exists
create_table >> execute_insert_query
execute_insert_query >> check_table_partition_exists
check_table_exists >> delete_dataset
check_table_partition_exists >> delete_dataset
29 changes: 29 additions & 0 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,35 @@ def table_exists(self, dataset_id: str, table_id: str, project_id: str) -> bool:
except NotFound:
return False

@GoogleBaseHook.fallback_to_default_project_id
def table_partition_exists(
self,
dataset_id: str,
table_id: str,
partition_id: str,
project_id: str
) -> bool:
"""
Checks for the existence of a partition in a table in Google BigQuery.
:param project_id: The Google cloud project in which to look for the
table. The connection supplied to the hook must provide access to
the specified project.
:type project_id: str
:param dataset_id: The name of the dataset in which to look for the
table.
:type dataset_id: str
:param table_id: The name of the table to check the existence of.
:type table_id: str
:param partition_id: The name of the partition to check the existence of.
:type partition_id: str
"""
table_reference = TableReference(DatasetReference(project_id, dataset_id), table_id)
try:
return partition_id in self.get_client(project_id=project_id).list_partitions(table_reference)
except NotFound:
return False

@GoogleBaseHook.fallback_to_default_project_id
def create_empty_table( # pylint: disable=too-many-arguments
self,
Expand Down
58 changes: 58 additions & 0 deletions airflow/providers/google/cloud/sensors/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,61 @@ def poke(self, context):
project_id=self.project_id,
dataset_id=self.dataset_id,
table_id=self.table_id)


class BigQueryTablePartitionExistenceSensor(BaseSensorOperator):
"""
Checks for the existence of a partition within a table in Google Bigquery.
:param project_id: The Google cloud project in which to look for the table.
The connection supplied to the hook must provide
access to the specified project.
:type project_id: str
:param dataset_id: The name of the dataset in which to look for the table.
storage bucket.
:type dataset_id: str
:param table_id: The name of the table to check the existence of.
:type table_id: str
:param partition_id: The name of the partition to check the existence of.
:type partition_id: str
:param bigquery_conn_id: The connection ID to use when connecting to
Google BigQuery.
:type bigquery_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must
have domain-wide delegation enabled.
:type delegate_to: str
"""
template_fields = ('project_id', 'dataset_id', 'table_id', 'partition_id',)
ui_color = '#f0eee4'

@apply_defaults
def __init__(self, *,
project_id: str,
dataset_id: str,
table_id: str,
partition_id: str,
bigquery_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
**kwargs) -> None:

super().__init__(**kwargs)
self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.partition_id = partition_id
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to

def poke(self, context):
table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
self.log.info('Sensor checks existence of partition: "%s" in table: %s', self.partition_id, table_uri)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
return hook.table_partition_exists(
project_id=self.project_id,
dataset_id=self.dataset_id,
table_id=self.table_id,
partition_id=self.partition_id
)
32 changes: 32 additions & 0 deletions docs/howto/operator/google/cloud/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,38 @@ tolerance of the ones from ``days_back`` before you can use
:start-after: [START howto_operator_bigquery_interval_check]
:end-before: [END howto_operator_bigquery_interval_check]

Sensors
^^^^^^^

Check that a Table exists
"""""""""""""""""""""""""

To check that a table exists you can define a sensor operator. This allows delaying execution
of downstream operators until a table exist. If the table is sharded on dates you can for instance
use the ``{{ ds_nodash }}`` macro as the table name suffix.

:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTableExistenceSensor`.

.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_bigquery_table]
:end-before: [END howto_sensor_bigquery_table]

Check that a Table Partition exists
"""""""""""""""""""""""""""""""""""

To check that a table exists and has a partition you can use.
:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTablePartitionExistenceSensor`.

.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_bigquery_table_partition]
:end-before: [END howto_sensor_bigquery_table_partition]

For DAY partitioned tables, the partition_id parameter is a string on the "%Y%m%d" format

Reference
^^^^^^^^^

Expand Down
40 changes: 40 additions & 0 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
CREDENTIALS = "bq-credentials"
DATASET_ID = "bq_dataset"
TABLE_ID = "bq_table"
PARTITION_ID = "20200101"
VIEW_ID = 'bq_view'
JOB_ID = "1234"
LOCATION = 'europe-north1'
Expand Down Expand Up @@ -119,6 +120,45 @@ def test_bigquery_table_exists_false(self, mock_client):
mock_client.assert_called_once_with(project_id=PROJECT_ID)
assert result is False

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
def test_bigquery_table_partition_exists_true(self, mock_client):
mock_client.return_value.list_partitions.return_value = [PARTITION_ID]
result = self.hook.table_partition_exists(
project_id=PROJECT_ID,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
partition_id=PARTITION_ID
)
mock_client.return_value.list_partitions.assert_called_once_with(TABLE_REFERENCE)
mock_client.assert_called_once_with(project_id=PROJECT_ID)
assert result is True

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
def test_bigquery_table_partition_exists_false_no_table(self, mock_client):
mock_client.return_value.get_table.side_effect = NotFound("Dataset not found")
result = self.hook.table_partition_exists(
project_id=PROJECT_ID,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
partition_id=PARTITION_ID
)
mock_client.return_value.list_partitions.assert_called_once_with(TABLE_REFERENCE)
mock_client.assert_called_once_with(project_id=PROJECT_ID)
assert result is False

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
def test_bigquery_table_partition_exists_false_no_partition(self, mock_client):
mock_client.return_value.list_partitions.return_value = []
result = self.hook.table_partition_exists(
project_id=PROJECT_ID,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
partition_id=PARTITION_ID
)
mock_client.return_value.list_partitions.assert_called_once_with(TABLE_REFERENCE)
mock_client.assert_called_once_with(project_id=PROJECT_ID)
assert result is False

@mock.patch('airflow.providers.google.cloud.hooks.bigquery.read_gbq')
def test_get_pandas_df(self, mock_read_gbq):
self.hook.get_pandas_df('select 1')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ def test_run_example_dag_operations_location(self):
def test_run_example_dag_queries(self):
self.run_dag('example_bigquery_queries', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_BIGQUERY_KEY)
def test_run_example_dag_sensors(self):
self.run_dag('example_bigquery_sensors', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_BIGQUERY_KEY)
def test_run_example_dag_queries_location(self):
self.run_dag('example_bigquery_queries_location', CLOUD_DAG_FOLDER)
Expand Down
34 changes: 33 additions & 1 deletion tests/providers/google/cloud/sensors/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

from unittest import TestCase, mock

from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
from airflow.providers.google.cloud.sensors.bigquery import (
BigQueryTableExistenceSensor, BigQueryTablePartitionExistenceSensor,
)

TEST_PROJECT_ID = "test_project"
TEST_DATASET_ID = 'test_dataset'
TEST_TABLE_ID = 'test_table'
TEST_DELEGATE_TO = "test_delegate_to"
TEST_GCP_CONN_ID = 'test_gcp_conn_id'
TEST_PARTITION_ID = "20200101"


class TestBigqueryTableExistenceSensor(TestCase):
Expand Down Expand Up @@ -51,3 +54,32 @@ def test_passing_arguments_to_hook(self, mock_hook):
dataset_id=TEST_DATASET_ID,
table_id=TEST_TABLE_ID
)


class TestBigqueryTablePartitionExistenceSensor(TestCase):
@mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
def test_passing_arguments_to_hook(self, mock_hook):
task = BigQueryTablePartitionExistenceSensor(
task_id='task-id',
project_id=TEST_PROJECT_ID,
dataset_id=TEST_DATASET_ID,
table_id=TEST_TABLE_ID,
partition_id=TEST_PARTITION_ID,
bigquery_conn_id=TEST_GCP_CONN_ID,
delegate_to=TEST_DELEGATE_TO
)
mock_hook.return_value.table_partition_exists.return_value = True
results = task.poke(mock.MagicMock())

self.assertEqual(True, results)

mock_hook.assert_called_once_with(
bigquery_conn_id=TEST_GCP_CONN_ID,
delegate_to=TEST_DELEGATE_TO
)
mock_hook.return_value.table_partition_exists.assert_called_once_with(
project_id=TEST_PROJECT_ID,
dataset_id=TEST_DATASET_ID,
table_id=TEST_TABLE_ID,
partition_id=TEST_PARTITION_ID
)

0 comments on commit 2f0613b

Please sign in to comment.