Skip to content

Commit

Permalink
[AIRFLOW-5743] Move Google PubSub to providers package (#6476)
Browse files Browse the repository at this point in the history
  • Loading branch information
ratb3rt authored and feluelle committed Nov 4, 2019
1 parent 45e108d commit a296cda
Show file tree
Hide file tree
Showing 17 changed files with 95 additions and 61 deletions.
28 changes: 14 additions & 14 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ It is required now to pass key-word only arguments to `PubSub` hook.
These changes are not backward compatible.

Affected components:
* airflow.gcp.hooks.pubsub.PubSubHook
* airflow.gcp.operators.pubsub.PubSubTopicCreateOperator
* airflow.gcp.operators.pubsub.PubSubSubscriptionCreateOperator
* airflow.gcp.operators.pubsub.PubSubTopicDeleteOperator
* airflow.gcp.operators.pubsub.PubSubSubscriptionDeleteOperator
* airflow.gcp.operators.pubsub.PubSubPublishOperator
* airflow.gcp.sensors.pubsub.PubSubPullSensor
* airflow.providers.google.cloud.hooks.pubsub.PubSubHook
* airflow.providers.google.cloud.operators.pubsub.PubSubTopicCreateOperator
* airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionCreateOperator
* airflow.providers.google.cloud.operators.pubsub.PubSubTopicDeleteOperator
* airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionDeleteOperator
* airflow.providers.google.cloud.operators.pubsub.PubSubPublishOperator
* airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor

### Changes to `aws_default` Connection's default region

Expand Down Expand Up @@ -225,7 +225,7 @@ The following table shows changes in import paths.
|airflow.contrib.hooks.gcp_kms_hook.GoogleCloudKMSHook |airflow.gcp.hooks.kms.GoogleCloudKMSHook |
|airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHook |airflow.gcp.hooks.mlengine.MLEngineHook |
|airflow.contrib.hooks.gcp_natural_language_hook.CloudNaturalLanguageHook |airflow.providers.google.cloud.hooks.natural_language.CloudNaturalLanguageHook |
|airflow.contrib.hooks.gcp_pubsub_hook.PubSubHook |airflow.gcp.hooks.pubsub.PubSubHook |
|airflow.contrib.hooks.gcp_pubsub_hook.PubSubHook |airflow.providers.google.cloud.hooks.pubsub.PubSubHook |
|airflow.contrib.hooks.gcp_speech_to_text_hook.GCPSpeechToTextHook |airflow.gcp.hooks.speech_to_text.CloudSpeechToTextHook |
|airflow.contrib.hooks.gcp_spanner_hook.CloudSpannerHook |airflow.gcp.hooks.spanner.SpannerHook |
|airflow.contrib.hooks.gcp_speech_to_text_hook.GCPSpeechToTextHook |airflow.gcp.hooks.speech_to_text.GCPSpeechToTextHook |
Expand Down Expand Up @@ -377,19 +377,19 @@ The following table shows changes in import paths.
|airflow.contrib.operators.mssql_to_gcs.MsSqlToGoogleCloudStorageOperator |airflow.operators.mssql_to_gcs.MsSqlToGoogleCloudStorageOperator |
|airflow.contrib.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator |airflow.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator |
|airflow.contrib.operators.postgres_to_gcs_operator.PostgresToGoogleCloudStorageOperator |airflow.operators.postgres_to_gcs.PostgresToGoogleCloudStorageOperator |
|airflow.contrib.operators.pubsub_operator.PubSubPublishOperator |airflow.gcp.operators.pubsub.PubSubPublishOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator |airflow.gcp.operators.pubsub.PubSubSubscriptionCreateOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator |airflow.gcp.operators.pubsub.PubSubSubscriptionDeleteOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator |airflow.gcp.operators.pubsub.PubSubTopicCreateOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator |airflow.gcp.operators.pubsub.PubSubTopicDeleteOperator |
|airflow.contrib.operators.pubsub_operator.PubSubPublishOperator |airflow.providers.google.cloud.operators.pubsub.PubSubPublishOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator |airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionCreateOperator |
|airflow.contrib.operators.pubsub_operator.PubSubSubscriptionDeleteOperator |airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionDeleteOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator |airflow.providers.google.cloud.operators.pubsub.PubSubTopicCreateOperator |
|airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator |airflow.providers.google.cloud.operators.pubsub.PubSubTopicDeleteOperator |
|airflow.contrib.operators.sql_to_gcs.BaseSQLToGoogleCloudStorageOperator |airflow.operators.sql_to_gcs.BaseSQLToGoogleCloudStorageOperator |
|airflow.contrib.sensors.bigquery_sensor.BigQueryTableSensor |airflow.gcp.sensors.bigquery.BigQueryTableSensor |
|airflow.contrib.sensors.gcp_transfer_sensor.GCPTransferServiceWaitForJobStatusSensor |airflow.gcp.sensors.cloud_storage_transfer_service.GCPTransferServiceWaitForJobStatusSensor |
|airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageObjectSensor |airflow.gcp.sensors.gcs.GoogleCloudStorageObjectSensor |
|airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageObjectUpdatedSensor |airflow.gcp.sensors.gcs.GoogleCloudStorageObjectUpdatedSensor |
|airflow.contrib.sensors.gcs_sensor.GoogleCloudStoragePrefixSensor |airflow.gcp.sensors.gcs.GoogleCloudStoragePrefixSensor |
|airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageUploadSessionCompleteSensor |airflow.gcp.sensors.gcs.GoogleCloudStorageUploadSessionCompleteSensor |
|airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor |airflow.gcp.sensors.pubsub.PubSubPullSensor |
|airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor |airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor |


### Remove provide_context
Expand Down
6 changes: 3 additions & 3 deletions airflow/contrib/hooks/gcp_pubsub_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module is deprecated. Please use `airflow.gcp.hooks.pubsub`."""
"""This module is deprecated. Please use `airflow.providers.google.cloud.hooks.pubsub`."""

import warnings

# pylint: disable=unused-import
from airflow.gcp.hooks.pubsub import PubSubException, PubSubHook # noqa
from airflow.providers.google.cloud.hooks.pubsub import PubSubException, PubSubHook # noqa

warnings.warn(
"This module is deprecated. Please use `airflow.gcp.hooks.pubsub`.",
"This module is deprecated. Please use `airflow.providers.google.cloud.hooks.pubsub`.",
DeprecationWarning, stacklevel=2
)
6 changes: 3 additions & 3 deletions airflow/contrib/operators/pubsub_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module is deprecated. Please use `airflow.gcp.operators.pubsub`."""
"""This module is deprecated. Please use `airflow.providers.google.cloud.operators.pubsub`."""

import warnings

# pylint: disable=unused-import
from airflow.gcp.operators.pubsub import ( # noqa
from airflow.providers.google.cloud.operators.pubsub import ( # noqa
PubSubPublishOperator, PubSubSubscriptionCreateOperator, PubSubSubscriptionDeleteOperator,
PubSubTopicCreateOperator, PubSubTopicDeleteOperator,
)

warnings.warn(
"This module is deprecated. Please use `airflow.gcp.operators.pubsub`.",
"This module is deprecated. Please use `airflow.providers.google.cloud.operators.pubsub`.",
DeprecationWarning, stacklevel=2
)
6 changes: 3 additions & 3 deletions airflow/contrib/sensors/pubsub_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module is deprecated. Please use `airflow.gcp.sensors.pubsub`."""
"""This module is deprecated. Please use `airflow.providers.google.cloud.sensors.pubsub`."""

import warnings

# pylint: disable=unused-import
from airflow.gcp.sensors.pubsub import PubSubPullSensor # noqa
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor # noqa

warnings.warn(
"This module is deprecated. Please use `airflow.gcp.sensors.pubsub`.",
"This module is deprecated. Please use `airflow.providers.google.cloud.sensors.pubsub`.",
DeprecationWarning, stacklevel=2
)
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

import airflow
from airflow import models
from airflow.gcp.operators.pubsub import (
from airflow.operators.bash_operator import BashOperator
from airflow.providers.google.cloud.operators.pubsub import (
PubSubPublishOperator, PubSubSubscriptionCreateOperator, PubSubSubscriptionDeleteOperator,
PubSubTopicCreateOperator, PubSubTopicDeleteOperator,
)
from airflow.gcp.sensors.pubsub import PubSubPullSensor
from airflow.operators.bash_operator import BashOperator
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
TOPIC = "PubSubTestTopic"
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from google.api_core.retry import Retry
from google.cloud.pubsub_v1.types import Duration, MessageStoragePolicy, PushConfig

from airflow.gcp.hooks.pubsub import PubSubHook
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
from airflow.utils.decorators import apply_defaults


Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/google/cloud/sensors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from google.protobuf.json_format import MessageToDict

from airflow.gcp.hooks.pubsub import PubSubHook
from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

Expand Down
2 changes: 1 addition & 1 deletion docs/autoapi_templates/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ All operators are in the following packages:

airflow/providers/google/marketing_platform/operators/index

airflow/providers/google/cloud/operators/index
airflow/providers/google/cloud/sensors/index

airflow/providers/google/marketing_platform/sensors/index

Expand Down
6 changes: 3 additions & 3 deletions docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -607,9 +607,9 @@ These integrations allow you to perform various operations within the Google Clo

* - `Cloud Pub/Sub <https://cloud.google.com/pubsub/>`__
-
- :mod:`airflow.gcp.hooks.pubsub`
- :mod:`airflow.gcp.operators.pubsub`
- :mod:`airflow.gcp.sensors.pubsub`
- :mod:`airflow.providers.google.cloud.hooks.pubsub`
- :mod:`airflow.providers.google.cloud.operators.pubsub`
- :mod:`airflow.providers.google.cloud.sensors.pubsub`

* - `Cloud Spanner <https://cloud.google.com/spanner/>`__
- :doc:`How to use <howto/operator/gcp/spanner>`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
from googleapiclient.errors import HttpError
from parameterized import parameterized

from airflow.gcp.hooks.pubsub import PubSubException, PubSubHook
from airflow.providers.google.cloud.hooks.pubsub import PubSubException, PubSubHook
from airflow.version import version
from tests.compat import mock

BASE_STRING = 'airflow.gcp.hooks.base.{}'
PUBSUB_STRING = 'airflow.gcp.hooks.pubsub.{}'
PUBSUB_STRING = 'airflow.providers.google.cloud.hooks.pubsub.{}'

EMPTY_CONTENT = b''
TEST_PROJECT = 'test-project'
Expand Down Expand Up @@ -59,9 +59,10 @@ def setUp(self):
new=mock_init):
self.pubsub_hook = PubSubHook(gcp_conn_id='test')

@mock.patch("airflow.gcp.hooks.pubsub.PubSubHook.client_info", new_callable=mock.PropertyMock)
@mock.patch("airflow.gcp.hooks.pubsub.PubSubHook._get_credentials")
@mock.patch("airflow.gcp.hooks.pubsub.PublisherClient")
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.client_info",
new_callable=mock.PropertyMock)
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook._get_credentials")
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PublisherClient")
def test_publisher_client_creation(self, mock_client, mock_get_creds, mock_client_info):
self.assertIsNone(self.pubsub_hook._client)
result = self.pubsub_hook.get_conn()
Expand All @@ -72,9 +73,10 @@ def test_publisher_client_creation(self, mock_client, mock_get_creds, mock_clien
self.assertEqual(mock_client.return_value, result)
self.assertEqual(self.pubsub_hook._client, result)

@mock.patch("airflow.gcp.hooks.pubsub.PubSubHook.client_info", new_callable=mock.PropertyMock)
@mock.patch("airflow.gcp.hooks.pubsub.PubSubHook._get_credentials")
@mock.patch("airflow.gcp.hooks.pubsub.SubscriberClient")
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.client_info",
new_callable=mock.PropertyMock)
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook._get_credentials")
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.SubscriberClient")
def test_subscriber_client_creation(self, mock_client, mock_get_creds, mock_client_info):
self.assertIsNone(self.pubsub_hook._client)
result = self.pubsub_hook.subscriber_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import unittest

from airflow.gcp.operators.pubsub import (
from airflow.providers.google.cloud.operators.pubsub import (
PubSubPublishOperator, PubSubSubscriptionCreateOperator, PubSubSubscriptionDeleteOperator,
PubSubTopicCreateOperator, PubSubTopicDeleteOperator,
)
Expand All @@ -41,7 +41,7 @@

class TestPubSubTopicCreateOperator(unittest.TestCase):

@mock.patch('airflow.gcp.operators.pubsub.PubSubHook')
@mock.patch('airflow.providers.google.cloud.operators.pubsub.PubSubHook')
def test_failifexists(self, mock_hook):
operator = PubSubTopicCreateOperator(
task_id=TASK_ID,
Expand All @@ -63,7 +63,7 @@ def test_failifexists(self, mock_hook):
metadata=None,
)

@mock.patch('airflow.gcp.operators.pubsub.PubSubHook')
@mock.patch('airflow.providers.google.cloud.operators.pubsub.PubSubHook')
def test_succeedifexists(self, mock_hook):
operator = PubSubTopicCreateOperator(
task_id=TASK_ID,
Expand All @@ -88,7 +88,7 @@ def test_succeedifexists(self, mock_hook):

class TestPubSubTopicDeleteOperator(unittest.TestCase):

@mock.patch('airflow.gcp.operators.pubsub.PubSubHook')
@mock.patch('airflow.providers.google.cloud.operators.pubsub.PubSubHook')
def test_execute(self, mock_hook):
operator = PubSubTopicDeleteOperator(
task_id=TASK_ID,
Expand All @@ -109,7 +109,7 @@ def test_execute(self, mock_hook):

class TestPubSubSubscriptionCreateOperator(unittest.TestCase):

@mock.patch('airflow.gcp.operators.pubsub.PubSubHook')
@mock.patch('airflow.providers.google.cloud.operators.pubsub.PubSubHook')
def test_execute(self, mock_hook):
operator = PubSubSubscriptionCreateOperator(
task_id=TASK_ID,
Expand All @@ -136,7 +136,7 @@ def test_execute(self, mock_hook):
)
self.assertEqual(response, TEST_SUBSCRIPTION)

@mock.patch('airflow.gcp.operators.pubsub.PubSubHook')
@mock.patch('airflow.providers.google.cloud.operators.pubsub.PubSubHook')
def test_execute_different_project_ids(self, mock_hook):
another_project = 'another-project'
operator = PubSubSubscriptionCreateOperator(
Expand Down Expand Up @@ -165,7 +165,7 @@ def test_execute_different_project_ids(self, mock_hook):
)
self.assertEqual(response, TEST_SUBSCRIPTION)

@mock.patch('airflow.gcp.operators.pubsub.PubSubHook')
@mock.patch('airflow.providers.google.cloud.operators.pubsub.PubSubHook')
def test_execute_no_subscription(self, mock_hook):
operator = PubSubSubscriptionCreateOperator(
task_id=TASK_ID,
Expand Down Expand Up @@ -194,7 +194,7 @@ def test_execute_no_subscription(self, mock_hook):

class TestPubSubSubscriptionDeleteOperator(unittest.TestCase):

@mock.patch('airflow.gcp.operators.pubsub.PubSubHook')
@mock.patch('airflow.providers.google.cloud.operators.pubsub.PubSubHook')
def test_execute(self, mock_hook):
operator = PubSubSubscriptionDeleteOperator(
task_id=TASK_ID,
Expand All @@ -215,7 +215,7 @@ def test_execute(self, mock_hook):

class TestPubSubPublishOperator(unittest.TestCase):

@mock.patch('airflow.gcp.operators.pubsub.PubSubHook')
@mock.patch('airflow.providers.google.cloud.operators.pubsub.PubSubHook')
def test_publish(self, mock_hook):
operator = PubSubPublishOperator(task_id=TASK_ID,
project_id=TEST_PROJECT,
Expand Down
File renamed without changes.
16 changes: 16 additions & 0 deletions tests/providers/google/cloud/sensors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

0 comments on commit a296cda

Please sign in to comment.