Skip to content

Commit

Permalink
Add example DAG for demonstrating usage of GCS sensors (#22808)
Browse files Browse the repository at this point in the history
Following GCS Sensors examples are provided as part of the change:
1. GCSUploadSessionCompleteSensor
2. GCSObjectUpdateSensor

The commit does the following:
1. Delete the newly created top level example_gcs.py as it was a
   wrong place for the sensors
2. Add the intended sensors of the PR to the existing example_gcs.py file
   located in airflow/cloud/example_dags directory
  • Loading branch information
pankajkoti committed Apr 8, 2022
1 parent a7b1f62 commit 6aa65a3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 9 deletions.
47 changes: 39 additions & 8 deletions airflow/providers/google/cloud/example_dags/example_gcs.py
Expand Up @@ -37,35 +37,44 @@
from airflow.providers.google.cloud.sensors.gcs import (
GCSObjectExistenceSensor,
GCSObjectsWithPrefixExistenceSensor,
GCSObjectUpdateSensor,
GCSUploadSessionCompleteSensor,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator

START_DATE = datetime(2021, 1, 1)

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
GCS_ACL_BUCKET_ROLE = "OWNER"
GCS_ACL_OBJECT_ROLE = "OWNER"

BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")

temp_dir_path = gettempdir()
PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
PATH_TO_TRANSFORM_SCRIPT = os.getenv(
"GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, "transform_script.py")
)
PATH_TO_UPLOAD_FILE = os.environ.get(
PATH_TO_UPLOAD_FILE = os.getenv(
"GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-example-upload.txt")
)
PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
PATH_TO_SAVED_FILE = os.environ.get(
PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
PATH_TO_SAVED_FILE = os.getenv(
"GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt")
)

BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]

# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG.
PATH_TO_MANUAL_UPLOAD_FILE = os.getenv(
"GCP_GCS_PATH_TO_MANUAL_UPLOAD_FILE", os.path.join(temp_dir_path, "test-gcs-manual-example-upload.txt")
)
BUCKET_MANUAL_UPLOAD_FILE_LOCATION = PATH_TO_MANUAL_UPLOAD_FILE.rpartition("/")[-1]
PATH_TO_MANUAL_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_MANUAL_UPLOAD_FILE_PREFIX", "test-gcs-manual-")

with models.DAG(
"example_gcs",
start_date=START_DATE,
Expand Down Expand Up @@ -208,9 +217,31 @@
task_id="gcs_object_with_prefix_exists_task",
)
# [END howto_sensor_object_with_prefix_exists_task]

# [START howto_sensor_gcs_upload_session_complete_task]
gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
bucket=BUCKET_1,
prefix=PATH_TO_MANUAL_UPLOAD_FILE_PREFIX,
inactivity_period=60,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_complete_task",
)
# [END howto_sensor_gcs_upload_session_complete_task]

# [START howto_sensor_object_update_exists_task]
gcs_update_object_exists = GCSObjectUpdateSensor(
bucket=BUCKET_1,
object=BUCKET_MANUAL_UPLOAD_FILE_LOCATION,
task_id="gcs_object_update_sensor_task",
)
# [END howto_sensor_object_update_exists_task]

delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1)

create_bucket >> upload_file >> [gcs_object_exists, gcs_object_with_prefix_exists] >> delete_bucket
create_bucket >> gcs_upload_session_complete >> gcs_update_object_exists >> delete_bucket


if __name__ == '__main__':
Expand Down
Expand Up @@ -41,7 +41,7 @@


class IDTokenCredentialsAdapter(google_auth_credentials.Credentials):
"""Convert Credentials with "openid" scope to IDTokenCredentials."""
"""Convert Credentials with ``openid`` scope to IDTokenCredentials."""

def __init__(self, credentials: oauth2_credentials.Credentials):
super().__init__()
Expand Down
26 changes: 26 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/gcs.rst
Expand Up @@ -193,6 +193,32 @@ Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefix
:start-after: [START howto_sensor_object_with_prefix_exists_task]
:end-before: [END howto_sensor_object_with_prefix_exists_task]

.. _howto/sensor:GCSUploadSessionCompleteSensor:

GCSUploadSessionCompleteSensor
------------------------------

Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor` to check for a change in the number of files with a specified prefix in Google Cloud Storage.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_gcs_upload_session_complete_task]
:end-before: [END howto_sensor_gcs_upload_session_complete_task]

.. _howto/sensor:GCSObjectUpdateSensor:

GCSObjectUpdateSensor
---------------------

Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor` to check if an object is updated in Google Cloud Storage.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_object_update_exists_task]
:end-before: [END howto_sensor_object_update_exists_task]

More information
""""""""""""""""

Expand Down

0 comments on commit 6aa65a3

Please sign in to comment.