Skip to content

Commit

Permalink
Migrate Google firestore example to new design AIP-47 (#24830)
Browse files Browse the repository at this point in the history
related: #22447, #22430
  • Loading branch information
chenglongyan committed Jul 12, 2022
1 parent f5cd2c3 commit fb51e04
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 89 deletions.
16 changes: 0 additions & 16 deletions airflow/providers/google/firebase/example_dags/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-google/example-dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ You can learn how to use Google integrations by analyzing the source code of the
* `Google Ads <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/ads/example_dags>`__
* `Google Cloud (legacy) <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/cloud/example_dags>`__
* `Google Cloud <https://github.com/apache/airflow/tree/providers-google/8.0.0/tests/system/providers/google/cloud>`__
* `Google Firebase <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/firebase/example_dags>`__
* `Google Firebase <https://github.com/apache/airflow/tree/providers-google/8.1.0/tests/system/providers/google/firebase>`__
* `Google Marketing Platform <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/marketing_platform/example_dags>`__
* `Google Workplace <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/suite/example_dags>`__ (formerly Google Suite)
* `Google LevelDB <https://github.com/apache/airflow/tree/providers-google/8.0.0/tests/system/providers/google/leveldb>`__
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Export database
Exports a copy of all or a subset of documents from Google Cloud Firestore to Google Cloud Storage is performed with the
:class:`~airflow.providers.google.firebase.operators.firestore.CloudFirestoreExportDatabaseOperator` operator.

.. exampleinclude:: /../../airflow/providers/google/firebase/example_dags/example_firestore.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_firestore.py
:language: python
:dedent: 4
:start-after: [START howto_operator_export_database_to_gcs]
Expand Down
48 changes: 0 additions & 48 deletions tests/providers/google/firebase/operators/test_firestore_system.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,43 +48,39 @@
from urllib.parse import urlparse

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateExternalTableOperator,
BigQueryDeleteDatasetOperator,
BigQueryInsertJobOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.firebase.operators.firestore import CloudFirestoreExportDatabaseOperator
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_google_firestore"
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-gcp-project")
FIRESTORE_PROJECT_ID = os.environ.get("G_FIRESTORE_PROJECT_ID", "example-firebase-project")

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
EXPORT_DESTINATION_URL = os.environ.get("GCP_FIRESTORE_ARCHIVE_URL", "gs://INVALID BUCKET NAME/namespace/")
BUCKET_NAME = urlparse(EXPORT_DESTINATION_URL).hostname
EXPORT_PREFIX = urlparse(EXPORT_DESTINATION_URL).path

EXPORT_COLLECTION_ID = os.environ.get("GCP_FIRESTORE_COLLECTION_ID", "firestore_collection_id")
DATASET_NAME = os.environ.get("GCP_FIRESTORE_DATASET_NAME", "test_firestore_export")
DATASET_LOCATION = os.environ.get("GCP_FIRESTORE_DATASET_LOCATION", "EU")

if BUCKET_NAME is None:
raise ValueError("Bucket name is required. Please set GCP_FIRESTORE_ARCHIVE_URL env variable.")

with models.DAG(
"example_google_firestore",
DAG_ID,
start_date=datetime(2021, 1, 1),
schedule_interval='@once',
catchup=False,
tags=["example"],
tags=["example", "firestore"],
) as dag:
# [START howto_operator_export_database_to_gcs]
export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
task_id="export_database_to_gcs",
project_id=FIRESTORE_PROJECT_ID,
body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds": [EXPORT_COLLECTION_ID]},
)
# [END howto_operator_export_database_to_gcs]
create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)

create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset",
Expand All @@ -93,9 +89,13 @@
project_id=GCP_PROJECT_ID,
)

delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, project_id=GCP_PROJECT_ID, delete_contents=True
# [START howto_operator_export_database_to_gcs]
export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
task_id="export_database_to_gcs",
project_id=FIRESTORE_PROJECT_ID,
body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds": [EXPORT_COLLECTION_ID]},
)
# [END howto_operator_export_database_to_gcs]

# [START howto_operator_create_external_table_multiple_types]
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
Expand Down Expand Up @@ -132,12 +132,39 @@
},
)

chain(
# Firestore
export_database_to_gcs,
# BigQuery
create_dataset,
create_external_table_multiple_types,
read_data_from_gcs_multiple_types,
delete_dataset,
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset",
dataset_id=DATASET_NAME,
project_id=GCP_PROJECT_ID,
delete_contents=True,
trigger_rule=TriggerRule.ALL_DONE,
)

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

(
# TEST SETUP
create_bucket
>> create_dataset
# TEST BODY
>> export_database_to_gcs
>> create_external_table_multiple_types
>> read_data_from_gcs_multiple_types
# TEST TEARDOWN
>> delete_dataset
>> delete_bucket
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit fb51e04

Please sign in to comment.