Skip to content

Commit

Permalink
Add missing params to GCP Pub/Sub creation_subscription (#10106)
Browse files Browse the repository at this point in the history
Add missing params to GCP Pub/Sub creation_subscription hook/operator
  • Loading branch information
ryanyuan committed Aug 2, 2020
1 parent 011c07a commit 85c56b1
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 2 deletions.
41 changes: 40 additions & 1 deletion airflow/providers/google/cloud/hooks/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
from google.api_core.retry import Retry
from google.cloud.exceptions import NotFound
from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient
from google.cloud.pubsub_v1.types import Duration, MessageStoragePolicy, PushConfig, ReceivedMessage
from google.cloud.pubsub_v1.types import (
DeadLetterPolicy, Duration, ExpirationPolicy, MessageStoragePolicy, PushConfig, ReceivedMessage,
RetryPolicy,
)
from googleapiclient.errors import HttpError

from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
Expand Down Expand Up @@ -300,6 +303,11 @@ def create_subscription(
retain_acked_messages: Optional[bool] = None,
message_retention_duration: Optional[Union[Dict, Duration]] = None,
labels: Optional[Dict[str, str]] = None,
enable_message_ordering: bool = False,
expiration_policy: Optional[Union[Dict, ExpirationPolicy]] = None,
filter_: Optional[str] = None,
dead_letter_policy: Optional[Union[Dict, DeadLetterPolicy]] = None,
retry_policy: Optional[Union[Dict, RetryPolicy]] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
Expand Down Expand Up @@ -345,6 +353,32 @@ def create_subscription(
:param labels: Client-assigned labels; see
https://cloud.google.com/pubsub/docs/labels
:type labels: Dict[str, str]
:param enable_message_ordering: If true, messages published with the same
ordering_key in PubsubMessage will be delivered to the subscribers in the order
in which they are received by the Pub/Sub system. Otherwise, they may be
delivered in any order.
:type enable_message_ordering: bool
:param expiration_policy: A policy that specifies the conditions for this
subscription’s expiration. A subscription is considered active as long as any
connected subscriber is successfully consuming messages from the subscription or
is issuing operations on the subscription. If expiration_policy is not set,
a default policy with ttl of 31 days will be used. The minimum allowed value for
expiration_policy.ttl is 1 day.
:type expiration_policy: Union[Dict, google.cloud.pubsub_v1.types.ExpirationPolicy`]
:param filter_: An expression written in the Cloud Pub/Sub filter language. If
non-empty, then only PubsubMessages whose attributes field matches the filter are
delivered on this subscription. If empty, then no messages are filtered out.
:type filter_: str
:param dead_letter_policy: A policy that specifies the conditions for dead lettering
messages in this subscription. If dead_letter_policy is not set, dead lettering is
disabled.
:type dead_letter_policy: Union[Dict, google.cloud.pubsub_v1.types.DeadLetterPolicy]
:param retry_policy: A policy that specifies how Pub/Sub retries message delivery
for this subscription. If not set, the default retry policy is applied. This
generally implies that messages will be retried as soon as possible for healthy
subscribers. RetryPolicy will be triggered on NACKs or acknowledgement deadline
exceeded events for a given message.
:type retry_policy: Union[Dict, google.cloud.pubsub_v1.types.RetryPolicy]
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
Expand Down Expand Up @@ -383,6 +417,11 @@ def create_subscription(
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration,
labels=labels,
enable_message_ordering=enable_message_ordering,
expiration_policy=expiration_policy,
filter_=filter_,
dead_letter_policy=dead_letter_policy,
retry_policy=retry_policy,
retry=retry,
timeout=timeout,
metadata=metadata,
Expand Down
47 changes: 46 additions & 1 deletion airflow/providers/google/cloud/operators/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

from google.api_core.retry import Retry
from google.cloud.pubsub_v1.types import Duration, MessageStoragePolicy, PushConfig, ReceivedMessage
from google.cloud.pubsub_v1.types import (
DeadLetterPolicy, Duration, ExpirationPolicy, MessageStoragePolicy, PushConfig, ReceivedMessage,
RetryPolicy,
)
from google.protobuf.json_format import MessageToDict

from airflow.models import BaseOperator
Expand Down Expand Up @@ -164,6 +167,7 @@ def execute(self, context):
self.log.info("Created topic %s", self.topic)


# pylint: disable=too-many-instance-attributes
class PubSubCreateSubscriptionOperator(BaseOperator):
"""Create a PubSub subscription.
Expand Down Expand Up @@ -257,6 +261,32 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
:param labels: Client-assigned labels; see
https://cloud.google.com/pubsub/docs/labels
:type labels: Dict[str, str]
:param enable_message_ordering: If true, messages published with the same
ordering_key in PubsubMessage will be delivered to the subscribers in the order
in which they are received by the Pub/Sub system. Otherwise, they may be
delivered in any order.
:type enable_message_ordering: bool
:param expiration_policy: A policy that specifies the conditions for this
subscription’s expiration. A subscription is considered active as long as any
connected subscriber is successfully consuming messages from the subscription or
is issuing operations on the subscription. If expiration_policy is not set,
a default policy with ttl of 31 days will be used. The minimum allowed value for
expiration_policy.ttl is 1 day.
:type expiration_policy: Union[Dict, google.cloud.pubsub_v1.types.ExpirationPolicy`]
:param filter_: An expression written in the Cloud Pub/Sub filter language. If
non-empty, then only PubsubMessages whose attributes field matches the filter are
delivered on this subscription. If empty, then no messages are filtered out.
:type filter_: str
:param dead_letter_policy: A policy that specifies the conditions for dead lettering
messages in this subscription. If dead_letter_policy is not set, dead lettering is
disabled.
:type dead_letter_policy: Union[Dict, google.cloud.pubsub_v1.types.DeadLetterPolicy]
:param retry_policy: A policy that specifies how Pub/Sub retries message delivery
for this subscription. If not set, the default retry policy is applied. This
generally implies that messages will be retried as soon as possible for healthy
subscribers. RetryPolicy will be triggered on NACKs or acknowledgement deadline
exceeded events for a given message.
:type retry_policy: Union[Dict, google.cloud.pubsub_v1.types.RetryPolicy]
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
Expand Down Expand Up @@ -291,6 +321,11 @@ def __init__(
retain_acked_messages: Optional[bool] = None,
message_retention_duration: Optional[Union[Dict, Duration]] = None,
labels: Optional[Dict[str, str]] = None,
enable_message_ordering: bool = False,
expiration_policy: Optional[Union[Dict, ExpirationPolicy]] = None,
filter_: Optional[str] = None,
dead_letter_policy: Optional[Union[Dict, DeadLetterPolicy]] = None,
retry_policy: Optional[Union[Dict, RetryPolicy]] = None,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
Expand Down Expand Up @@ -324,6 +359,11 @@ def __init__(
self.retain_acked_messages = retain_acked_messages
self.message_retention_duration = message_retention_duration
self.labels = labels
self.enable_message_ordering = enable_message_ordering
self.expiration_policy = expiration_policy
self.filter_ = filter_
self.dead_letter_policy = dead_letter_policy
self.retry_policy = retry_policy
self.retry = retry
self.timeout = timeout
self.metadata = metadata
Expand All @@ -344,6 +384,11 @@ def execute(self, context):
retain_acked_messages=self.retain_acked_messages,
message_retention_duration=self.message_retention_duration,
labels=self.labels,
enable_message_ordering=self.enable_message_ordering,
expiration_policy=self.expiration_policy,
filter_=self.filter_,
dead_letter_policy=self.dead_letter_policy,
retry_policy=self.retry_policy,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata
Expand Down
49 changes: 49 additions & 0 deletions tests/providers/google/cloud/hooks/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ def test_create_nonexistent_subscription(self, mock_service):
retain_acked_messages=None,
message_retention_duration=None,
labels=LABELS,
enable_message_ordering=False,
expiration_policy=None,
filter_=None,
dead_letter_policy=None,
retry_policy=None,
retry=None,
timeout=None,
metadata=None,
Expand All @@ -216,6 +221,11 @@ def test_create_subscription_different_project_topic(self, mock_service):
retain_acked_messages=None,
message_retention_duration=None,
labels=LABELS,
enable_message_ordering=False,
expiration_policy=None,
filter_=None,
dead_letter_policy=None,
retry_policy=None,
retry=None,
timeout=None,
metadata=None,
Expand Down Expand Up @@ -271,6 +281,11 @@ def test_create_subscription_without_subscription_name(self, mock_uuid,
retain_acked_messages=None,
message_retention_duration=None,
labels=LABELS,
enable_message_ordering=False,
expiration_policy=None,
filter_=None,
dead_letter_policy=None,
retry_policy=None,
retry=None,
timeout=None,
metadata=None,
Expand All @@ -292,6 +307,40 @@ def test_create_subscription_with_ack_deadline(self, mock_service):
retain_acked_messages=None,
message_retention_duration=None,
labels=LABELS,
enable_message_ordering=False,
expiration_policy=None,
filter_=None,
dead_letter_policy=None,
retry_policy=None,
retry=None,
timeout=None,
metadata=None,
)
self.assertEqual(TEST_SUBSCRIPTION, response)

@mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
def test_create_subscription_with_filter(self, mock_service):
create_method = mock_service.create_subscription

response = self.pubsub_hook.create_subscription(
project_id=TEST_PROJECT,
topic=TEST_TOPIC,
subscription=TEST_SUBSCRIPTION,
filter_='attributes.domain="com"'
)
create_method.assert_called_once_with(
name=EXPANDED_SUBSCRIPTION,
topic=EXPANDED_TOPIC,
push_config=None,
ack_deadline_seconds=10,
retain_acked_messages=None,
message_retention_duration=None,
labels=LABELS,
enable_message_ordering=False,
expiration_policy=None,
filter_='attributes.domain="com"',
dead_letter_policy=None,
retry_policy=None,
retry=None,
timeout=None,
metadata=None,
Expand Down
15 changes: 15 additions & 0 deletions tests/providers/google/cloud/operators/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ def test_execute(self, mock_hook):
retain_acked_messages=None,
message_retention_duration=None,
labels=None,
enable_message_ordering=False,
expiration_policy=None,
filter_=None,
dead_letter_policy=None,
retry_policy=None,
retry=None,
timeout=None,
metadata=None,
Expand Down Expand Up @@ -158,6 +163,11 @@ def test_execute_different_project_ids(self, mock_hook):
retain_acked_messages=None,
message_retention_duration=None,
labels=None,
enable_message_ordering=False,
expiration_policy=None,
filter_=None,
dead_letter_policy=None,
retry_policy=None,
retry=None,
timeout=None,
metadata=None
Expand All @@ -184,6 +194,11 @@ def test_execute_no_subscription(self, mock_hook):
retain_acked_messages=None,
message_retention_duration=None,
labels=None,
enable_message_ordering=False,
expiration_policy=None,
filter_=None,
dead_letter_policy=None,
retry_policy=None,
retry=None,
timeout=None,
metadata=None,
Expand Down

0 comments on commit 85c56b1

Please sign in to comment.