Skip to content

Commit e31b27d

Browse files
authored
Add materialized view support for BigQuery (#14201)
1 parent e3bcaa3 commit e31b27d

File tree

6 files changed

+115
-3
lines changed

6 files changed

+115
-3
lines changed

airflow/providers/google/cloud/example_dags/example_bigquery_operations.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,26 @@
9898
)
9999
# [END howto_operator_bigquery_delete_view]
100100

101+
# [START howto_operator_bigquery_create_materialized_view]
102+
create_materialized_view = BigQueryCreateEmptyTableOperator(
103+
task_id="create_materialized_view",
104+
dataset_id=DATASET_NAME,
105+
table_id="test_materialized_view",
106+
materialized_view={
107+
"query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
108+
"enableRefresh": True,
109+
"refreshIntervalMs": 2000000,
110+
},
111+
)
112+
# [END howto_operator_bigquery_create_materialized_view]
113+
114+
# [START howto_operator_bigquery_delete_materialized_view]
115+
delete_materialized_view = BigQueryDeleteTableOperator(
116+
task_id="delete_materialized_view",
117+
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
118+
)
119+
# [END howto_operator_bigquery_delete_materialized_view]
120+
101121
# [START howto_operator_bigquery_create_external_table]
102122
create_external_table = BigQueryCreateExternalTableOperator(
103123
task_id="create_external_table",
@@ -185,10 +205,10 @@
185205

186206
create_dataset >> patch_dataset >> update_dataset >> get_dataset >> get_dataset_result >> delete_dataset
187207

188-
update_dataset >> create_table >> create_view >> update_table >> [
208+
update_dataset >> create_table >> create_view >> create_materialized_view >> update_table >> [
189209
get_dataset_tables,
190210
delete_view,
191-
] >> upsert_table >> delete_table >> delete_dataset
211+
] >> upsert_table >> delete_materialized_view >> delete_table >> delete_dataset
192212
update_dataset >> create_external_table >> delete_dataset
193213

194214
with models.DAG(

airflow/providers/google/cloud/hooks/bigquery.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ def create_empty_table( # pylint: disable=too-many-arguments
274274
cluster_fields: Optional[List[str]] = None,
275275
labels: Optional[Dict] = None,
276276
view: Optional[Dict] = None,
277+
materialized_view: Optional[Dict] = None,
277278
encryption_configuration: Optional[Dict] = None,
278279
retry: Optional[Retry] = DEFAULT_RETRY,
279280
num_retries: Optional[int] = None,
@@ -330,6 +331,8 @@ def create_empty_table( # pylint: disable=too-many-arguments
330331
"useLegacySql": False
331332
}
332333
334+
:param materialized_view: [Optional] The materialized view definition.
335+
:type materialized_view: dict
333336
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
334337
**Example**: ::
335338
@@ -366,6 +369,9 @@ def create_empty_table( # pylint: disable=too-many-arguments
366369
if view:
367370
_table_resource['view'] = view
368371

372+
if materialized_view:
373+
_table_resource['materializedView'] = materialized_view
374+
369375
if encryption_configuration:
370376
_table_resource["encryptionConfiguration"] = encryption_configuration
371377

airflow/providers/google/cloud/operators/bigquery.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,8 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
841841
.. seealso::
842842
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
843843
:type view: dict
844+
:param materialized_view: [Optional] The materialized view definition.
845+
:type materialized_view: dict
844846
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
845847
**Example**: ::
846848
@@ -875,9 +877,10 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
875877
'gcs_schema_object',
876878
'labels',
877879
'view',
880+
'materialized_view',
878881
'impersonation_chain',
879882
)
880-
template_fields_renderers = {"table_resource": "json"}
883+
template_fields_renderers = {"table_resource": "json", "materialized_view": "json"}
881884
ui_color = BigQueryUIColors.TABLE.value
882885

883886
# pylint: disable=too-many-arguments
@@ -897,6 +900,7 @@ def __init__(
897900
delegate_to: Optional[str] = None,
898901
labels: Optional[Dict] = None,
899902
view: Optional[Dict] = None,
903+
materialized_view: Optional[Dict] = None,
900904
encryption_configuration: Optional[Dict] = None,
901905
location: Optional[str] = None,
902906
cluster_fields: Optional[List[str]] = None,
@@ -916,6 +920,7 @@ def __init__(
916920
self.time_partitioning = {} if time_partitioning is None else time_partitioning
917921
self.labels = labels
918922
self.view = view
923+
self.materialized_view = materialized_view
919924
self.encryption_configuration = encryption_configuration
920925
self.location = location
921926
self.cluster_fields = cluster_fields
@@ -952,6 +957,7 @@ def execute(self, context) -> None:
952957
cluster_fields=self.cluster_fields,
953958
labels=self.labels,
954959
view=self.view,
960+
materialized_view=self.materialized_view,
955961
encryption_configuration=self.encryption_configuration,
956962
table_resource=self.table_resource,
957963
exists_ok=False,

docs/apache-airflow-providers-google/operators/cloud/bigquery.rst

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,15 @@ You can use this operator to create a view on top of an existing table.
178178
:start-after: [START howto_operator_bigquery_create_view]
179179
:end-before: [END howto_operator_bigquery_create_view]
180180

181+
You can also use this operator to create a materialized view that periodically
182+
cache results of a query for increased performance and efficiency.
183+
184+
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
185+
:language: python
186+
:dedent: 4
187+
:start-after: [START howto_operator_bigquery_create_materialized_view]
188+
:end-before: [END howto_operator_bigquery_create_materialized_view]
189+
181190
.. _howto/operator:BigQueryCreateExternalTableOperator:
182191

183192
Create external table
@@ -258,6 +267,14 @@ You can also use this operator to delete a view.
258267
:start-after: [START howto_operator_bigquery_delete_view]
259268
:end-before: [END howto_operator_bigquery_delete_view]
260269

270+
You can also use this operator to delete a materialized view.
271+
272+
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
273+
:language: python
274+
:dedent: 4
275+
:start-after: [START howto_operator_bigquery_delete_materialized_view]
276+
:end-before: [END howto_operator_bigquery_delete_materialized_view]
277+
261278
.. _howto/operator:BigQueryInsertJobOperator:
262279

263280
Execute BigQuery jobs

tests/providers/google/cloud/hooks/test_bigquery.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,35 @@ def test_get_tables_list(self, mock_client):
974974
for res, exp in zip(result, table_list):
975975
assert res["tableId"] == exp["tableReference"]["tableId"]
976976

977+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Table")
978+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client")
979+
def test_create_materialized_view(self, mock_bq_client, mock_table):
980+
query = """
981+
SELECT product, SUM(amount)
982+
FROM `test-project-id.test_dataset_id.test_table_prefix*`
983+
GROUP BY product
984+
"""
985+
materialized_view = {
986+
'query': query,
987+
'enableRefresh': True,
988+
'refreshIntervalMs': 2000000,
989+
}
990+
991+
self.hook.create_empty_table(
992+
project_id=PROJECT_ID,
993+
dataset_id=DATASET_ID,
994+
table_id=TABLE_ID,
995+
materialized_view=materialized_view,
996+
retry=DEFAULT_RETRY,
997+
)
998+
body = {'tableReference': TABLE_REFERENCE_REPR, 'materializedView': materialized_view}
999+
mock_table.from_api_repr.assert_called_once_with(body)
1000+
mock_bq_client.return_value.create_table.assert_called_once_with(
1001+
table=mock_table.from_api_repr.return_value,
1002+
exists_ok=True,
1003+
retry=DEFAULT_RETRY,
1004+
)
1005+
9771006

9781007
class TestBigQueryCursor(_BigQueryBaseTestClass):
9791008
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_service")

tests/providers/google/cloud/operators/test_bigquery.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@
6969
"query": f"SELECT * FROM `{TEST_DATASET}.{TEST_TABLE_ID}`",
7070
"useLegacySql": False,
7171
}
72+
MATERIALIZED_VIEW_DEFINITION = {
73+
'query': f'SELECT product, SUM(amount) FROM `{TEST_DATASET}.{TEST_TABLE_ID}` GROUP BY product',
74+
'enableRefresh': True,
75+
'refreshIntervalMs': 2000000,
76+
}
7277

7378

7479
class TestBigQueryCreateEmptyTableOperator(unittest.TestCase):
@@ -88,6 +93,7 @@ def test_execute(self, mock_hook):
8893
cluster_fields=None,
8994
labels=None,
9095
view=None,
96+
materialized_view=None,
9197
encryption_configuration=None,
9298
table_resource=None,
9399
exists_ok=False,
@@ -113,6 +119,33 @@ def test_create_view(self, mock_hook):
113119
cluster_fields=None,
114120
labels=None,
115121
view=VIEW_DEFINITION,
122+
materialized_view=None,
123+
encryption_configuration=None,
124+
table_resource=None,
125+
exists_ok=False,
126+
)
127+
128+
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
129+
def test_create_materialized_view(self, mock_hook):
130+
operator = BigQueryCreateEmptyTableOperator(
131+
task_id=TASK_ID,
132+
dataset_id=TEST_DATASET,
133+
project_id=TEST_GCP_PROJECT_ID,
134+
table_id=TEST_TABLE_ID,
135+
materialized_view=MATERIALIZED_VIEW_DEFINITION,
136+
)
137+
138+
operator.execute(None)
139+
mock_hook.return_value.create_empty_table.assert_called_once_with(
140+
dataset_id=TEST_DATASET,
141+
project_id=TEST_GCP_PROJECT_ID,
142+
table_id=TEST_TABLE_ID,
143+
schema_fields=None,
144+
time_partitioning={},
145+
cluster_fields=None,
146+
labels=None,
147+
view=None,
148+
materialized_view=MATERIALIZED_VIEW_DEFINITION,
116149
encryption_configuration=None,
117150
table_resource=None,
118151
exists_ok=False,
@@ -148,6 +181,7 @@ def test_create_clustered_empty_table(self, mock_hook):
148181
cluster_fields=cluster_fields,
149182
labels=None,
150183
view=None,
184+
materialized_view=None,
151185
encryption_configuration=None,
152186
table_resource=None,
153187
exists_ok=False,

0 commit comments

Comments
 (0)