Skip to content

Commit bfad233

Browse files
authored
Fix providers tests in main branch with eager upgrades (#18040)
The SQS and DataCatalog were failing tests in main branch because some recent release of dependencies broke them: 1) SQS moto 2.2.6 broke SQS tests - the queue url in the 2.2.6+ version has to start with http:// or https:// 2) DataCatalog part of Google Provider incorrectly imported types and broke tests (used beta instad of datacatalog path)
1 parent 1be3ef6 commit bfad233

File tree

4 files changed

+29
-24
lines changed

4 files changed

+29
-24
lines changed

airflow/providers/google/cloud/hooks/datacatalog.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from google.api_core.retry import Retry
2121
from google.cloud import datacatalog
22-
from google.cloud.datacatalog_v1beta1 import (
22+
from google.cloud.datacatalog import (
2323
CreateTagRequest,
2424
DataCatalogClient,
2525
Entry,

tests/providers/amazon/aws/operators/test_sqs.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929

3030
DEFAULT_DATE = timezone.datetime(2019, 1, 1)
3131

32+
QUEUE_NAME = 'test-queue'
33+
QUEUE_URL = f'https://{QUEUE_NAME}'
34+
3235

3336
class TestSQSPublishOperator(unittest.TestCase):
3437
def setUp(self):
@@ -38,7 +41,7 @@ def setUp(self):
3841
self.operator = SQSPublishOperator(
3942
task_id='test_task',
4043
dag=self.dag,
41-
sqs_queue='test',
44+
sqs_queue=QUEUE_URL,
4245
message_content='hello',
4346
aws_conn_id='aws_default',
4447
)
@@ -48,13 +51,13 @@ def setUp(self):
4851

4952
@mock_sqs
5053
def test_execute_success(self):
51-
self.sqs_hook.create_queue('test')
54+
self.sqs_hook.create_queue(QUEUE_NAME)
5255

5356
result = self.operator.execute(self.mock_context)
5457
assert 'MD5OfMessageBody' in result
5558
assert 'MessageId' in result
5659

57-
message = self.sqs_hook.get_conn().receive_message(QueueUrl='test')
60+
message = self.sqs_hook.get_conn().receive_message(QueueUrl=QUEUE_URL)
5861

5962
assert len(message['Messages']) == 1
6063
assert message['Messages'][0]['MessageId'] == result['MessageId']

tests/providers/amazon/aws/sensors/test_sqs.py

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,26 @@
3232

3333
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
3434

35+
QUEUE_NAME = 'test-queue'
36+
QUEUE_URL = f'https://{QUEUE_NAME}'
37+
3538

3639
class TestSQSSensor(unittest.TestCase):
3740
def setUp(self):
3841
args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
3942

4043
self.dag = DAG('test_dag_id', default_args=args)
4144
self.sensor = SQSSensor(
42-
task_id='test_task', dag=self.dag, sqs_queue='test', aws_conn_id='aws_default'
45+
task_id='test_task', dag=self.dag, sqs_queue=QUEUE_URL, aws_conn_id='aws_default'
4346
)
4447

4548
self.mock_context = mock.MagicMock()
4649
self.sqs_hook = SQSHook()
4750

4851
@mock_sqs
4952
def test_poke_success(self):
50-
self.sqs_hook.create_queue('test')
51-
self.sqs_hook.send_message(queue_url='test', message_body='hello')
53+
self.sqs_hook.create_queue(QUEUE_NAME)
54+
self.sqs_hook.send_message(queue_url=QUEUE_URL, message_body='hello')
5255

5356
result = self.sensor.poke(self.mock_context)
5457
assert result
@@ -60,7 +63,7 @@ def test_poke_success(self):
6063
@mock_sqs
6164
def test_poke_no_message_failed(self):
6265

63-
self.sqs_hook.create_queue('test')
66+
self.sqs_hook.create_queue(QUEUE_NAME)
6467
result = self.sensor.poke(self.mock_context)
6568
assert not result
6669

@@ -112,40 +115,40 @@ def test_poke_receive_raise_exception(self, mock_conn):
112115
@mock.patch.object(SQSHook, 'get_conn')
113116
def test_poke_visibility_timeout(self, mock_conn):
114117
# Check without visibility_timeout parameter
115-
self.sqs_hook.create_queue('test')
116-
self.sqs_hook.send_message(queue_url='test', message_body='hello')
118+
self.sqs_hook.create_queue(QUEUE_NAME)
119+
self.sqs_hook.send_message(queue_url=QUEUE_URL, message_body='hello')
117120

118121
self.sensor.poke(self.mock_context)
119122

120123
calls_receive_message = [
121-
mock.call().receive_message(QueueUrl='test', MaxNumberOfMessages=5, WaitTimeSeconds=1)
124+
mock.call().receive_message(QueueUrl=QUEUE_URL, MaxNumberOfMessages=5, WaitTimeSeconds=1)
122125
]
123126
mock_conn.assert_has_calls(calls_receive_message)
124127
# Check with visibility_timeout parameter
125128
self.sensor = SQSSensor(
126129
task_id='test_task2',
127130
dag=self.dag,
128-
sqs_queue='test',
131+
sqs_queue=QUEUE_URL,
129132
aws_conn_id='aws_default',
130133
visibility_timeout=42,
131134
)
132135
self.sensor.poke(self.mock_context)
133136

134137
calls_receive_message = [
135138
mock.call().receive_message(
136-
QueueUrl='test', MaxNumberOfMessages=5, WaitTimeSeconds=1, VisibilityTimeout=42
139+
QueueUrl=QUEUE_URL, MaxNumberOfMessages=5, WaitTimeSeconds=1, VisibilityTimeout=42
137140
)
138141
]
139142
mock_conn.assert_has_calls(calls_receive_message)
140143

141144
@mock_sqs
142145
def test_poke_message_invalid_filtering(self):
143-
self.sqs_hook.create_queue('test')
144-
self.sqs_hook.send_message(queue_url='test', message_body='hello')
146+
self.sqs_hook.create_queue(QUEUE_NAME)
147+
self.sqs_hook.send_message(queue_url=QUEUE_URL, message_body='hello')
145148
sensor = SQSSensor(
146149
task_id='test_task2',
147150
dag=self.dag,
148-
sqs_queue='test',
151+
sqs_queue=QUEUE_URL,
149152
aws_conn_id='aws_default',
150153
message_filtering='invalid_option',
151154
)
@@ -155,7 +158,7 @@ def test_poke_message_invalid_filtering(self):
155158

156159
@mock.patch.object(SQSHook, "get_conn")
157160
def test_poke_message_filtering_literal_values(self, mock_conn):
158-
self.sqs_hook.create_queue('test')
161+
self.sqs_hook.create_queue(QUEUE_NAME)
159162
matching = [{"id": 11, "body": "a matching message"}]
160163
non_matching = [{"id": 12, "body": "a non-matching message"}]
161164
all = matching + non_matching
@@ -188,13 +191,13 @@ def mock_delete_message_batch(**kwargs):
188191
# Test that only filtered messages are deleted
189192
delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]
190193
calls_delete_message_batch = [
191-
mock.call().delete_message_batch(QueueUrl='test', Entries=delete_entries)
194+
mock.call().delete_message_batch(QueueUrl=QUEUE_URL, Entries=delete_entries)
192195
]
193196
mock_conn.assert_has_calls(calls_delete_message_batch)
194197

195198
@mock.patch.object(SQSHook, "get_conn")
196199
def test_poke_message_filtering_jsonpath(self, mock_conn):
197-
self.sqs_hook.create_queue('test')
200+
self.sqs_hook.create_queue(QUEUE_NAME)
198201
matching = [
199202
{"id": 11, "key": {"matches": [1, 2]}},
200203
{"id": 12, "key": {"matches": [3, 4, 5]}},
@@ -234,13 +237,13 @@ def mock_delete_message_batch(**kwargs):
234237
# Test that only filtered messages are deleted
235238
delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]
236239
calls_delete_message_batch = [
237-
mock.call().delete_message_batch(QueueUrl='test', Entries=delete_entries)
240+
mock.call().delete_message_batch(QueueUrl=QUEUE_URL, Entries=delete_entries)
238241
]
239242
mock_conn.assert_has_calls(calls_delete_message_batch)
240243

241244
@mock.patch.object(SQSHook, "get_conn")
242245
def test_poke_message_filtering_jsonpath_values(self, mock_conn):
243-
self.sqs_hook.create_queue('test')
246+
self.sqs_hook.create_queue(QUEUE_NAME)
244247
matching = [
245248
{"id": 11, "key": {"matches": [1, 2]}},
246249
{"id": 12, "key": {"matches": [1, 4, 5]}},
@@ -282,6 +285,6 @@ def mock_delete_message_batch(**kwargs):
282285
# Test that only filtered messages are deleted
283286
delete_entries = [{'Id': x['id'], 'ReceiptHandle': 100 + x['id']} for x in matching]
284287
calls_delete_message_batch = [
285-
mock.call().delete_message_batch(QueueUrl='test', Entries=delete_entries)
288+
mock.call().delete_message_batch(QueueUrl='https://test-queue', Entries=delete_entries)
286289
]
287290
mock_conn.assert_has_calls(calls_delete_message_batch)

tests/providers/google/cloud/hooks/test_datacatalog.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222

2323
import pytest
2424
from google.api_core.retry import Retry
25-
from google.cloud.datacatalog_v1beta1 import CreateTagRequest, CreateTagTemplateRequest
26-
from google.cloud.datacatalog_v1beta1.types import Entry, Tag, TagTemplate
25+
from google.cloud.datacatalog import CreateTagRequest, CreateTagTemplateRequest, Entry, Tag, TagTemplate
2726

2827
from airflow import AirflowException
2928
from airflow.providers.google.cloud.hooks.datacatalog import CloudDataCatalogHook

0 commit comments

Comments
 (0)