Skip to content

Commit

Permalink
PubSub assets & system tests migration (AIP-47) (#24867)
Browse files Browse the repository at this point in the history
  • Loading branch information
wojsamjan committed Jul 12, 2022
1 parent 2a0d3d1 commit 93992f2
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 113 deletions.
71 changes: 71 additions & 0 deletions airflow/providers/google/cloud/links/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Pub/Sub links."""
from typing import TYPE_CHECKING, Optional

from airflow.models import BaseOperator
from airflow.providers.google.cloud.links.base import BaseGoogleLink

if TYPE_CHECKING:
from airflow.utils.context import Context

PUBSUB_BASE_LINK = "https://console.cloud.google.com/cloudpubsub"
PUBSUB_TOPIC_LINK = PUBSUB_BASE_LINK + "/topic/detail/{topic_id}?project={project_id}"
PUBSUB_SUBSCRIPTION_LINK = PUBSUB_BASE_LINK + "/subscription/detail/{subscription_id}?project={project_id}"


class PubSubTopicLink(BaseGoogleLink):
"""Helper class for constructing Pub/Sub Topic Link"""

name = "Pub/Sub Topic"
key = "pubsub_topic"
format_str = PUBSUB_TOPIC_LINK

@staticmethod
def persist(
context: "Context",
task_instance: BaseOperator,
topic_id: str,
project_id: Optional[str],
):
task_instance.xcom_push(
context,
key=PubSubTopicLink.key,
value={"topic_id": topic_id, "project_id": project_id},
)


class PubSubSubscriptionLink(BaseGoogleLink):
"""Helper class for constructing Pub/Sub Subscription Link"""

name = "Pub/Sub Subscription"
key = "pubsub_subscription"
format_str = PUBSUB_SUBSCRIPTION_LINK

@staticmethod
def persist(
context: "Context",
task_instance: BaseOperator,
subscription_id: Optional[str],
project_id: Optional[str],
):
task_instance.xcom_push(
context,
key=PubSubSubscriptionLink.key,
value={"subscription_id": subscription_id, "project_id": project_id},
)
15 changes: 15 additions & 0 deletions airflow/providers/google/cloud/operators/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
from airflow.providers.google.cloud.links.pubsub import PubSubSubscriptionLink, PubSubTopicLink

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -117,6 +118,7 @@ class PubSubCreateTopicOperator(BaseOperator):
'impersonation_chain',
)
ui_color = '#0273d4'
operator_extra_links = (PubSubTopicLink(),)

def __init__(
self,
Expand Down Expand Up @@ -170,6 +172,12 @@ def execute(self, context: 'Context') -> None:
metadata=self.metadata,
)
self.log.info("Created topic %s", self.topic)
PubSubTopicLink.persist(
context=context,
task_instance=self,
topic_id=self.topic,
project_id=self.project_id or hook.project_id,
)


class PubSubCreateSubscriptionOperator(BaseOperator):
Expand Down Expand Up @@ -305,6 +313,7 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
'impersonation_chain',
)
ui_color = '#0273d4'
operator_extra_links = (PubSubSubscriptionLink(),)

def __init__(
self,
Expand Down Expand Up @@ -385,6 +394,12 @@ def execute(self, context: 'Context') -> str:
)

self.log.info("Created subscription for topic %s", self.topic)
PubSubSubscriptionLink.persist(
context=context,
task_instance=self,
subscription_id=self.subscription or result, # result returns subscription name
project_id=self.project_id or hook.project_id,
)
return result


Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,8 @@ extra-links:
- airflow.providers.google.cloud.links.stackdriver.StackdriverPoliciesLink
- airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEngineClusterLink
- airflow.providers.google.cloud.links.kubernetes_engine.KubernetesEnginePodLink
- airflow.providers.google.cloud.links.pubsub.PubSubSubscriptionLink
- airflow.providers.google.cloud.links.pubsub.PubSubTopicLink
- airflow.providers.google.common.links.storage.StorageLink
- airflow.providers.google.common.links.storage.FileDetailsLink

Expand Down
18 changes: 9 additions & 9 deletions docs/apache-airflow-providers-google/operators/cloud/pubsub.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Creating a PubSub topic
The PubSub topic is a named resource to which messages are sent by publishers.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubCreateTopicOperator` operator creates a topic.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_pubsub.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/pubsub/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_create_topic]
:end-before: [END howto_operator_gcp_pubsub_create_topic]
Expand All @@ -56,7 +56,7 @@ A ``Subscription`` is a named resource representing the stream of messages from
to be delivered to the subscribing application.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubCreateSubscriptionOperator` operator creates the subscription.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_pubsub.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/pubsub/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_create_subscription]
:end-before: [END howto_operator_gcp_pubsub_create_subscription]
Expand All @@ -70,7 +70,7 @@ Publishing PubSub messages
A ``Message`` is a combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers.
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPublishMessageOperator` operator would publish messages.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_pubsub.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/pubsub/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_publish]
:end-before: [END howto_operator_gcp_pubsub_publish]
Expand All @@ -83,24 +83,24 @@ Pulling messages from a PubSub subscription
The :class:`~airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor` sensor pulls messages from a PubSub subscription
and pass them through XCom.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_pubsub.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/pubsub/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_pull_message_with_sensor]
:end-before: [END howto_operator_gcp_pubsub_pull_message_with_sensor]

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_pubsub.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/pubsub/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_pull_message_with_operator]
:end-before: [END howto_operator_gcp_pubsub_pull_message_with_operator]

To pull messages from XCom use the :class:`~airflow.operators.bash.BashOperator`.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_pubsub.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/pubsub/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_pull_messages_result_cmd]
:end-before: [END howto_operator_gcp_pubsub_pull_messages_result_cmd]

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_pubsub.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/pubsub/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_pull_messages_result]
:end-before: [END howto_operator_gcp_pubsub_pull_messages_result]
Expand All @@ -113,7 +113,7 @@ Deleting a PubSub subscription

The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator` operator deletes the subscription.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_pubsub.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/pubsub/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_unsubscribe]
:end-before: [END howto_operator_gcp_pubsub_unsubscribe]
Expand All @@ -126,7 +126,7 @@ Deleting a PubSub topic

The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator` operator deletes topic.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_pubsub.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/pubsub/example_pubsub.py
:language: python
:start-after: [START howto_operator_gcp_pubsub_delete_topic]
:end-before: [END howto_operator_gcp_pubsub_delete_topic]
Expand Down
15 changes: 10 additions & 5 deletions tests/providers/google/cloud/operators/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def test_failifexists(self, mock_hook):
task_id=TASK_ID, project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_exists=True
)

operator.execute(None)
context = mock.MagicMock()
operator.execute(context=context)
mock_hook.return_value.create_topic.assert_called_once_with(
project_id=TEST_PROJECT,
topic=TEST_TOPIC,
Expand All @@ -69,7 +70,8 @@ def test_succeedifexists(self, mock_hook):
task_id=TASK_ID, project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_exists=False
)

operator.execute(None)
context = mock.MagicMock()
operator.execute(context=context)
mock_hook.return_value.create_topic.assert_called_once_with(
project_id=TEST_PROJECT,
topic=TEST_TOPIC,
Expand Down Expand Up @@ -106,7 +108,8 @@ def test_execute(self, mock_hook):
task_id=TASK_ID, project_id=TEST_PROJECT, topic=TEST_TOPIC, subscription=TEST_SUBSCRIPTION
)
mock_hook.return_value.create_subscription.return_value = TEST_SUBSCRIPTION
response = operator.execute(None)
context = mock.MagicMock()
response = operator.execute(context=context)
mock_hook.return_value.create_subscription.assert_called_once_with(
project_id=TEST_PROJECT,
topic=TEST_TOPIC,
Expand Down Expand Up @@ -140,7 +143,8 @@ def test_execute_different_project_ids(self, mock_hook):
task_id=TASK_ID,
)
mock_hook.return_value.create_subscription.return_value = TEST_SUBSCRIPTION
response = operator.execute(None)
context = mock.MagicMock()
response = operator.execute(context=context)
mock_hook.return_value.create_subscription.assert_called_once_with(
project_id=TEST_PROJECT,
topic=TEST_TOPIC,
Expand Down Expand Up @@ -169,7 +173,8 @@ def test_execute_no_subscription(self, mock_hook):
task_id=TASK_ID, project_id=TEST_PROJECT, topic=TEST_TOPIC
)
mock_hook.return_value.create_subscription.return_value = TEST_SUBSCRIPTION
response = operator.execute(None)
context = mock.MagicMock()
response = operator.execute(context=context)
mock_hook.return_value.create_subscription.assert_called_once_with(
project_id=TEST_PROJECT,
topic=TEST_TOPIC,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -15,25 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest

from tests.providers.google.cloud.utils.gcp_authenticator import GCP_PUBSUB_KEY
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context


@pytest.mark.backend("mysql", "postgres")
@pytest.mark.credential_file(GCP_PUBSUB_KEY)
class PubSubSystemTest(GoogleSystemTest):
def setUp(self):
super().setUp()

@provide_gcp_context(GCP_PUBSUB_KEY)
def test_run_example_sensor_dag(self):
self.run_dag(dag_id="example_gcp_pubsub_sensor", dag_folder=CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_PUBSUB_KEY)
def test_run_example_operator_dag(self):
self.run_dag(dag_id="example_gcp_pubsub_operator", dag_folder=CLOUD_DAG_FOLDER)

def tearDown(self):
super().tearDown()

0 comments on commit 93992f2

Please sign in to comment.