Skip to content

Commit

Permalink
Support google-cloud-pubsub>=2.0.0 (#13127)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Dec 22, 2020
1 parent b26b0df commit 8c00ec8
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 165 deletions.
81 changes: 43 additions & 38 deletions airflow/providers/google/cloud/hooks/pubsub.py
Expand Up @@ -111,7 +111,7 @@ def publish(
self._validate_messages(messages)

publisher = self.get_conn()
topic_path = PublisherClient.topic_path(project_id, topic) # pylint: disable=no-member
topic_path = f"projects/{project_id}/topics/{topic}"

self.log.info("Publish %d messages to topic (path) %s", len(messages), topic_path)
try:
Expand Down Expand Up @@ -206,7 +206,7 @@ def create_topic(
:type metadata: Sequence[Tuple[str, str]]]
"""
publisher = self.get_conn()
topic_path = PublisherClient.topic_path(project_id, topic) # pylint: disable=no-member
topic_path = f"projects/{project_id}/topics/{topic}"

# Add airflow-version label to the topic
labels = labels or {}
Expand All @@ -216,13 +216,15 @@ def create_topic(
try:
# pylint: disable=no-member
publisher.create_topic(
name=topic_path,
labels=labels,
message_storage_policy=message_storage_policy,
kms_key_name=kms_key_name,
request={
"name": topic_path,
"labels": labels,
"message_storage_policy": message_storage_policy,
"kms_key_name": kms_key_name,
},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
except AlreadyExists:
self.log.warning('Topic already exists: %s', topic)
Expand Down Expand Up @@ -266,16 +268,13 @@ def delete_topic(
:type metadata: Sequence[Tuple[str, str]]]
"""
publisher = self.get_conn()
topic_path = PublisherClient.topic_path(project_id, topic) # pylint: disable=no-member
topic_path = f"projects/{project_id}/topics/{topic}"

self.log.info("Deleting topic (path) %s", topic_path)
try:
# pylint: disable=no-member
publisher.delete_topic(
topic=topic_path,
retry=retry,
timeout=timeout,
metadata=metadata,
request={"topic": topic_path}, retry=retry, timeout=timeout, metadata=metadata or ()
)
except NotFound:
self.log.warning('Topic does not exist: %s', topic_path)
Expand Down Expand Up @@ -401,27 +400,29 @@ def create_subscription(
labels['airflow-version'] = 'v' + version.replace('.', '-').replace('+', '-')

# pylint: disable=no-member
subscription_path = SubscriberClient.subscription_path(subscription_project_id, subscription)
topic_path = SubscriberClient.topic_path(project_id, topic)
subscription_path = f"projects/{subscription_project_id}/subscriptions/{subscription}"
topic_path = f"projects/{project_id}/topics/{topic}"

self.log.info("Creating subscription (path) %s for topic (path) %a", subscription_path, topic_path)
try:
subscriber.create_subscription(
name=subscription_path,
topic=topic_path,
push_config=push_config,
ack_deadline_seconds=ack_deadline_secs,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration,
labels=labels,
enable_message_ordering=enable_message_ordering,
expiration_policy=expiration_policy,
filter_=filter_,
dead_letter_policy=dead_letter_policy,
retry_policy=retry_policy,
request={
"name": subscription_path,
"topic": topic_path,
"push_config": push_config,
"ack_deadline_seconds": ack_deadline_secs,
"retain_acked_messages": retain_acked_messages,
"message_retention_duration": message_retention_duration,
"labels": labels,
"enable_message_ordering": enable_message_ordering,
"expiration_policy": expiration_policy,
"filter": filter_,
"dead_letter_policy": dead_letter_policy,
"retry_policy": retry_policy,
},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
except AlreadyExists:
self.log.warning('Subscription already exists: %s', subscription_path)
Expand Down Expand Up @@ -466,13 +467,16 @@ def delete_subscription(
"""
subscriber = self.subscriber_client
# noqa E501 # pylint: disable=no-member
subscription_path = SubscriberClient.subscription_path(project_id, subscription)
subscription_path = f"projects/{project_id}/subscriptions/{subscription}"

self.log.info("Deleting subscription (path) %s", subscription_path)
try:
# pylint: disable=no-member
subscriber.delete_subscription(
subscription=subscription_path, retry=retry, timeout=timeout, metadata=metadata
request={"subscription": subscription_path},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)

except NotFound:
Expand Down Expand Up @@ -527,18 +531,20 @@ def pull(
"""
subscriber = self.subscriber_client
# noqa E501 # pylint: disable=no-member,line-too-long
subscription_path = SubscriberClient.subscription_path(project_id, subscription)
subscription_path = f"projects/{project_id}/subscriptions/{subscription}"

self.log.info("Pulling max %d messages from subscription (path) %s", max_messages, subscription_path)
try:
# pylint: disable=no-member
response = subscriber.pull(
subscription=subscription_path,
max_messages=max_messages,
return_immediately=return_immediately,
request={
"subscription": subscription_path,
"max_messages": max_messages,
"return_immediately": return_immediately,
},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
result = getattr(response, 'received_messages', [])
self.log.info("Pulled %d messages from subscription (path) %s", len(result), subscription_path)
Expand Down Expand Up @@ -591,17 +597,16 @@ def acknowledge(

subscriber = self.subscriber_client
# noqa E501 # pylint: disable=no-member
subscription_path = SubscriberClient.subscription_path(project_id, subscription)
subscription_path = f"projects/{project_id}/subscriptions/{subscription}"

self.log.info("Acknowledging %d ack_ids from subscription (path) %s", len(ack_ids), subscription_path)
try:
# pylint: disable=no-member
subscriber.acknowledge(
subscription=subscription_path,
ack_ids=ack_ids,
request={"subscription": subscription_path, "ack_ids": ack_ids},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
except (HttpError, GoogleAPICallError) as e:
raise PubSubException(
Expand Down
3 changes: 1 addition & 2 deletions airflow/providers/google/cloud/operators/pubsub.py
Expand Up @@ -29,7 +29,6 @@
ReceivedMessage,
RetryPolicy,
)
from google.protobuf.json_format import MessageToDict

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
Expand Down Expand Up @@ -958,6 +957,6 @@ def _default_message_callback(
:param context: same as in `execute`
:return: value to be saved to XCom.
"""
messages_json = [MessageToDict(m) for m in pulled_messages]
messages_json = [ReceivedMessage.to_dict(m) for m in pulled_messages]

return messages_json
3 changes: 1 addition & 2 deletions airflow/providers/google/cloud/sensors/pubsub.py
Expand Up @@ -20,7 +20,6 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Union

from google.cloud.pubsub_v1.types import ReceivedMessage
from google.protobuf.json_format import MessageToDict

from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -200,6 +199,6 @@ def _default_message_callback(
:param context: same as in `execute`
:return: value to be saved to XCom.
"""
messages_json = [MessageToDict(m) for m in pulled_messages]
messages_json = [ReceivedMessage.to_dict(m) for m in pulled_messages]

return messages_json
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -266,7 +266,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
'google-cloud-memcache>=0.2.0',
'google-cloud-monitoring>=0.34.0,<2.0.0',
'google-cloud-os-login>=2.0.0,<3.0.0',
'google-cloud-pubsub>=1.0.0,<2.0.0',
'google-cloud-pubsub>=2.0.0,<3.0.0',
'google-cloud-redis>=0.3.0,<2.0.0',
'google-cloud-secret-manager>=0.2.0,<2.0.0',
'google-cloud-spanner>=1.10.0,<2.0.0',
Expand Down

0 comments on commit 8c00ec8

Please sign in to comment.