Skip to content

Commit

Permalink
Deprecate CloudComposerEnvironmentSensor in favor of `CloudComposer…
Browse files Browse the repository at this point in the history
…CreateEnvironmentOperator` with defer mode (#35775)

Co-authored-by: Ulada Zakharava <[email protected]>
  • Loading branch information
VladaZakharova and Ulada Zakharava committed Nov 25, 2023
1 parent 196a235 commit 373d8a5
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 137 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/cloud_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class CloudComposerCreateEnvironmentOperator(GoogleCloudBaseOperator):
:param metadata: Strings which should be sent along with the request as metadata.
:param deferrable: Run operator in the deferrable mode
:param pooling_period_seconds: Optional: Control the rate of the poll for the result of deferrable run.
By default the trigger will poll every 30 seconds.
By default, the trigger will poll every 30 seconds.
"""

template_fields = (
Expand Down
15 changes: 14 additions & 1 deletion airflow/providers/google/cloud/sensors/cloud_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, Sequence

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.google.cloud.triggers.cloud_composer import CloudComposerExecutionTrigger
from airflow.sensors.base import BaseSensorOperator

Expand All @@ -33,6 +34,11 @@ class CloudComposerEnvironmentSensor(BaseSensorOperator):
"""
Check the status of the Cloud Composer Environment task.
This Sensor is deprecated. You can achieve the same functionality by using Cloud Composer Operators
CloudComposerCreateEnvironmentOperator, CloudComposerDeleteEnvironmentOperator and
CloudComposerUpdateEnvironmentOperator in deferrable or non-deferrable mode, since every operator
gives user a possibility to wait (asynchronously or synchronously) until Operation will be finished.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param operation_name: The name of the operation resource
Expand All @@ -59,6 +65,13 @@ def __init__(
pooling_period_seconds: int = 30,
**kwargs,
):
warnings.warn(
f"The `{self.__class__.__name__}` operator is deprecated. You can achieve the same functionality "
f"by using operators in deferrable or non-deferrable mode, since every operator for Cloud "
f"Composer will wait for the operation to complete.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ With this configuration we can create the environment:
or you can define the same operator in the deferrable mode:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_composer_environment_deferrable_mode]
Expand Down Expand Up @@ -116,7 +116,7 @@ To update a service you can use:
or you can define the same operator in the deferrable mode:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_operator_update_composer_environment_deferrable_mode]
Expand All @@ -138,7 +138,7 @@ To delete a service you can use:
or you can define the same operator in the deferrable mode:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerDeleteEnvironmentOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_operator_delete_composer_environment_deferrable_mode]
Expand Down
1 change: 1 addition & 0 deletions tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ class TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
"airflow.providers.google.cloud.operators.bigquery.BigQueryPatchDatasetOperator",
"airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator",
"airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator",
"airflow.providers.google.cloud.sensors.cloud_composer.CloudComposerEnvironmentSensor",
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360CreateQueryOperator",
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360RunQueryOperator",
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360DownloadReportV2Operator",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,31 @@
)
from airflow.utils.trigger_rule import TriggerRule

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")

DAG_ID = "example_composer"

REGION = "us-central1"

# [START howto_operator_composer_simple_environment]

ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")

ENVIRONMENT = {
"config": {
"software_config": {"image_version": "composer-2.0.28-airflow-2.2.5"},
"software_config": {"image_version": "composer-2.5.0-airflow-2.5.3"},
}
}
# [END howto_operator_composer_simple_environment]

# [START howto_operator_composer_update_environment]
UPDATED_ENVIRONMENT = {
"labels": {
"label1": "testing",
"label": "testing",
}
}
UPDATE_MASK = {"paths": ["labels.label1"]}
UPDATE_MASK = {"paths": ["labels.label"]}
# [END howto_operator_composer_update_environment]


Expand Down Expand Up @@ -85,6 +85,17 @@
)
# [END howto_operator_create_composer_environment]

# [START howto_operator_create_composer_environment_deferrable_mode]
defer_create_env = CloudComposerCreateEnvironmentOperator(
task_id="defer_create_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
environment=ENVIRONMENT,
deferrable=True,
)
# [END howto_operator_create_composer_environment_deferrable_mode]

# [START howto_operator_list_composer_environments]
list_envs = CloudComposerListEnvironmentsOperator(
task_id="list_envs", project_id=PROJECT_ID, region=REGION
Expand All @@ -111,6 +122,18 @@
)
# [END howto_operator_update_composer_environment]

# [START howto_operator_update_composer_environment_deferrable_mode]
defer_update_env = CloudComposerUpdateEnvironmentOperator(
task_id="defer_update_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
update_mask=UPDATE_MASK,
environment=UPDATED_ENVIRONMENT,
deferrable=True,
)
# [END howto_operator_update_composer_environment_deferrable_mode]

# [START howto_operator_delete_composer_environment]
delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="delete_env",
Expand All @@ -121,7 +144,25 @@
# [END howto_operator_delete_composer_environment]
delete_env.trigger_rule = TriggerRule.ALL_DONE

chain(image_versions, create_env, list_envs, get_env, update_env, delete_env)
# [START howto_operator_delete_composer_environment_deferrable_mode]
defer_delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="defer_delete_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
deferrable=True,
)
# [END howto_operator_delete_composer_environment_deferrable_mode]
defer_delete_env.trigger_rule = TriggerRule.ALL_DONE

chain(
image_versions,
[create_env, defer_create_env],
list_envs,
get_env,
[update_env, defer_update_env],
[delete_env, defer_delete_env],
)

from tests.system.utils.watcher import watcher

Expand Down

This file was deleted.

0 comments on commit 373d8a5

Please sign in to comment.