Skip to content

Commit

Permalink
Add materialized view support for BigQuery (#14201)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanyuan committed Feb 12, 2021
1 parent e3bcaa3 commit e31b27d
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,26 @@
)
# [END howto_operator_bigquery_delete_view]

# [START howto_operator_bigquery_create_materialized_view]
create_materialized_view = BigQueryCreateEmptyTableOperator(
task_id="create_materialized_view",
dataset_id=DATASET_NAME,
table_id="test_materialized_view",
materialized_view={
"query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"enableRefresh": True,
"refreshIntervalMs": 2000000,
},
)
# [END howto_operator_bigquery_create_materialized_view]

# [START howto_operator_bigquery_delete_materialized_view]
delete_materialized_view = BigQueryDeleteTableOperator(
task_id="delete_materialized_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)
# [END howto_operator_bigquery_delete_materialized_view]

# [START howto_operator_bigquery_create_external_table]
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
Expand Down Expand Up @@ -185,10 +205,10 @@

create_dataset >> patch_dataset >> update_dataset >> get_dataset >> get_dataset_result >> delete_dataset

update_dataset >> create_table >> create_view >> update_table >> [
update_dataset >> create_table >> create_view >> create_materialized_view >> update_table >> [
get_dataset_tables,
delete_view,
] >> upsert_table >> delete_table >> delete_dataset
] >> upsert_table >> delete_materialized_view >> delete_table >> delete_dataset
update_dataset >> create_external_table >> delete_dataset

with models.DAG(
Expand Down
6 changes: 6 additions & 0 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def create_empty_table( # pylint: disable=too-many-arguments
cluster_fields: Optional[List[str]] = None,
labels: Optional[Dict] = None,
view: Optional[Dict] = None,
materialized_view: Optional[Dict] = None,
encryption_configuration: Optional[Dict] = None,
retry: Optional[Retry] = DEFAULT_RETRY,
num_retries: Optional[int] = None,
Expand Down Expand Up @@ -330,6 +331,8 @@ def create_empty_table( # pylint: disable=too-many-arguments
"useLegacySql": False
}
:param materialized_view: [Optional] The materialized view definition.
:type materialized_view: dict
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
**Example**: ::
Expand Down Expand Up @@ -366,6 +369,9 @@ def create_empty_table( # pylint: disable=too-many-arguments
if view:
_table_resource['view'] = view

if materialized_view:
_table_resource['materializedView'] = materialized_view

if encryption_configuration:
_table_resource["encryptionConfiguration"] = encryption_configuration

Expand Down
8 changes: 7 additions & 1 deletion airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,8 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
.. seealso::
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
:type view: dict
:param materialized_view: [Optional] The materialized view definition.
:type materialized_view: dict
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
**Example**: ::
Expand Down Expand Up @@ -875,9 +877,10 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
'gcs_schema_object',
'labels',
'view',
'materialized_view',
'impersonation_chain',
)
template_fields_renderers = {"table_resource": "json"}
template_fields_renderers = {"table_resource": "json", "materialized_view": "json"}
ui_color = BigQueryUIColors.TABLE.value

# pylint: disable=too-many-arguments
Expand All @@ -897,6 +900,7 @@ def __init__(
delegate_to: Optional[str] = None,
labels: Optional[Dict] = None,
view: Optional[Dict] = None,
materialized_view: Optional[Dict] = None,
encryption_configuration: Optional[Dict] = None,
location: Optional[str] = None,
cluster_fields: Optional[List[str]] = None,
Expand All @@ -916,6 +920,7 @@ def __init__(
self.time_partitioning = {} if time_partitioning is None else time_partitioning
self.labels = labels
self.view = view
self.materialized_view = materialized_view
self.encryption_configuration = encryption_configuration
self.location = location
self.cluster_fields = cluster_fields
Expand Down Expand Up @@ -952,6 +957,7 @@ def execute(self, context) -> None:
cluster_fields=self.cluster_fields,
labels=self.labels,
view=self.view,
materialized_view=self.materialized_view,
encryption_configuration=self.encryption_configuration,
table_resource=self.table_resource,
exists_ok=False,
Expand Down
17 changes: 17 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ You can use this operator to create a view on top of an existing table.
:start-after: [START howto_operator_bigquery_create_view]
:end-before: [END howto_operator_bigquery_create_view]

You can also use this operator to create a materialized view that periodically
cache results of a query for increased performance and efficiency.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_create_materialized_view]
:end-before: [END howto_operator_bigquery_create_materialized_view]

.. _howto/operator:BigQueryCreateExternalTableOperator:

Create external table
Expand Down Expand Up @@ -258,6 +267,14 @@ You can also use this operator to delete a view.
:start-after: [START howto_operator_bigquery_delete_view]
:end-before: [END howto_operator_bigquery_delete_view]

You can also use this operator to delete a materialized view.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
:language: python
:dedent: 4
:start-after: [START howto_operator_bigquery_delete_materialized_view]
:end-before: [END howto_operator_bigquery_delete_materialized_view]

.. _howto/operator:BigQueryInsertJobOperator:

Execute BigQuery jobs
Expand Down
29 changes: 29 additions & 0 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,35 @@ def test_get_tables_list(self, mock_client):
for res, exp in zip(result, table_list):
assert res["tableId"] == exp["tableReference"]["tableId"]

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Table")
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client")
def test_create_materialized_view(self, mock_bq_client, mock_table):
query = """
SELECT product, SUM(amount)
FROM `test-project-id.test_dataset_id.test_table_prefix*`
GROUP BY product
"""
materialized_view = {
'query': query,
'enableRefresh': True,
'refreshIntervalMs': 2000000,
}

self.hook.create_empty_table(
project_id=PROJECT_ID,
dataset_id=DATASET_ID,
table_id=TABLE_ID,
materialized_view=materialized_view,
retry=DEFAULT_RETRY,
)
body = {'tableReference': TABLE_REFERENCE_REPR, 'materializedView': materialized_view}
mock_table.from_api_repr.assert_called_once_with(body)
mock_bq_client.return_value.create_table.assert_called_once_with(
table=mock_table.from_api_repr.return_value,
exists_ok=True,
retry=DEFAULT_RETRY,
)


class TestBigQueryCursor(_BigQueryBaseTestClass):
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_service")
Expand Down
34 changes: 34 additions & 0 deletions tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
"query": f"SELECT * FROM `{TEST_DATASET}.{TEST_TABLE_ID}`",
"useLegacySql": False,
}
MATERIALIZED_VIEW_DEFINITION = {
'query': f'SELECT product, SUM(amount) FROM `{TEST_DATASET}.{TEST_TABLE_ID}` GROUP BY product',
'enableRefresh': True,
'refreshIntervalMs': 2000000,
}


class TestBigQueryCreateEmptyTableOperator(unittest.TestCase):
Expand All @@ -88,6 +93,7 @@ def test_execute(self, mock_hook):
cluster_fields=None,
labels=None,
view=None,
materialized_view=None,
encryption_configuration=None,
table_resource=None,
exists_ok=False,
Expand All @@ -113,6 +119,33 @@ def test_create_view(self, mock_hook):
cluster_fields=None,
labels=None,
view=VIEW_DEFINITION,
materialized_view=None,
encryption_configuration=None,
table_resource=None,
exists_ok=False,
)

@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_create_materialized_view(self, mock_hook):
operator = BigQueryCreateEmptyTableOperator(
task_id=TASK_ID,
dataset_id=TEST_DATASET,
project_id=TEST_GCP_PROJECT_ID,
table_id=TEST_TABLE_ID,
materialized_view=MATERIALIZED_VIEW_DEFINITION,
)

operator.execute(None)
mock_hook.return_value.create_empty_table.assert_called_once_with(
dataset_id=TEST_DATASET,
project_id=TEST_GCP_PROJECT_ID,
table_id=TEST_TABLE_ID,
schema_fields=None,
time_partitioning={},
cluster_fields=None,
labels=None,
view=None,
materialized_view=MATERIALIZED_VIEW_DEFINITION,
encryption_configuration=None,
table_resource=None,
exists_ok=False,
Expand Down Expand Up @@ -148,6 +181,7 @@ def test_create_clustered_empty_table(self, mock_hook):
cluster_fields=cluster_fields,
labels=None,
view=None,
materialized_view=None,
encryption_configuration=None,
table_resource=None,
exists_ok=False,
Expand Down

0 comments on commit e31b27d

Please sign in to comment.