32
32
33
33
DEFAULT_DATE = timezone .datetime (2017 , 1 , 1 )
34
34
35
+ QUEUE_NAME = 'test-queue'
36
+ QUEUE_URL = f'https://{ QUEUE_NAME } '
37
+
35
38
36
39
class TestSQSSensor (unittest .TestCase ):
37
40
def setUp (self ):
38
41
args = {'owner' : 'airflow' , 'start_date' : DEFAULT_DATE }
39
42
40
43
self .dag = DAG ('test_dag_id' , default_args = args )
41
44
self .sensor = SQSSensor (
42
- task_id = 'test_task' , dag = self .dag , sqs_queue = 'test' , aws_conn_id = 'aws_default'
45
+ task_id = 'test_task' , dag = self .dag , sqs_queue = QUEUE_URL , aws_conn_id = 'aws_default'
43
46
)
44
47
45
48
self .mock_context = mock .MagicMock ()
46
49
self .sqs_hook = SQSHook ()
47
50
48
51
@mock_sqs
49
52
def test_poke_success (self ):
50
- self .sqs_hook .create_queue ('test' )
51
- self .sqs_hook .send_message (queue_url = 'test' , message_body = 'hello' )
53
+ self .sqs_hook .create_queue (QUEUE_NAME )
54
+ self .sqs_hook .send_message (queue_url = QUEUE_URL , message_body = 'hello' )
52
55
53
56
result = self .sensor .poke (self .mock_context )
54
57
assert result
@@ -60,7 +63,7 @@ def test_poke_success(self):
60
63
@mock_sqs
61
64
def test_poke_no_message_failed (self ):
62
65
63
- self .sqs_hook .create_queue ('test' )
66
+ self .sqs_hook .create_queue (QUEUE_NAME )
64
67
result = self .sensor .poke (self .mock_context )
65
68
assert not result
66
69
@@ -112,40 +115,40 @@ def test_poke_receive_raise_exception(self, mock_conn):
112
115
@mock .patch .object (SQSHook , 'get_conn' )
113
116
def test_poke_visibility_timeout (self , mock_conn ):
114
117
# Check without visibility_timeout parameter
115
- self .sqs_hook .create_queue ('test' )
116
- self .sqs_hook .send_message (queue_url = 'test' , message_body = 'hello' )
118
+ self .sqs_hook .create_queue (QUEUE_NAME )
119
+ self .sqs_hook .send_message (queue_url = QUEUE_URL , message_body = 'hello' )
117
120
118
121
self .sensor .poke (self .mock_context )
119
122
120
123
calls_receive_message = [
121
- mock .call ().receive_message (QueueUrl = 'test' , MaxNumberOfMessages = 5 , WaitTimeSeconds = 1 )
124
+ mock .call ().receive_message (QueueUrl = QUEUE_URL , MaxNumberOfMessages = 5 , WaitTimeSeconds = 1 )
122
125
]
123
126
mock_conn .assert_has_calls (calls_receive_message )
124
127
# Check with visibility_timeout parameter
125
128
self .sensor = SQSSensor (
126
129
task_id = 'test_task2' ,
127
130
dag = self .dag ,
128
- sqs_queue = 'test' ,
131
+ sqs_queue = QUEUE_URL ,
129
132
aws_conn_id = 'aws_default' ,
130
133
visibility_timeout = 42 ,
131
134
)
132
135
self .sensor .poke (self .mock_context )
133
136
134
137
calls_receive_message = [
135
138
mock .call ().receive_message (
136
- QueueUrl = 'test' , MaxNumberOfMessages = 5 , WaitTimeSeconds = 1 , VisibilityTimeout = 42
139
+ QueueUrl = QUEUE_URL , MaxNumberOfMessages = 5 , WaitTimeSeconds = 1 , VisibilityTimeout = 42
137
140
)
138
141
]
139
142
mock_conn .assert_has_calls (calls_receive_message )
140
143
141
144
@mock_sqs
142
145
def test_poke_message_invalid_filtering (self ):
143
- self .sqs_hook .create_queue ('test' )
144
- self .sqs_hook .send_message (queue_url = 'test' , message_body = 'hello' )
146
+ self .sqs_hook .create_queue (QUEUE_NAME )
147
+ self .sqs_hook .send_message (queue_url = QUEUE_URL , message_body = 'hello' )
145
148
sensor = SQSSensor (
146
149
task_id = 'test_task2' ,
147
150
dag = self .dag ,
148
- sqs_queue = 'test' ,
151
+ sqs_queue = QUEUE_URL ,
149
152
aws_conn_id = 'aws_default' ,
150
153
message_filtering = 'invalid_option' ,
151
154
)
@@ -155,7 +158,7 @@ def test_poke_message_invalid_filtering(self):
155
158
156
159
@mock .patch .object (SQSHook , "get_conn" )
157
160
def test_poke_message_filtering_literal_values (self , mock_conn ):
158
- self .sqs_hook .create_queue ('test' )
161
+ self .sqs_hook .create_queue (QUEUE_NAME )
159
162
matching = [{"id" : 11 , "body" : "a matching message" }]
160
163
non_matching = [{"id" : 12 , "body" : "a non-matching message" }]
161
164
all = matching + non_matching
@@ -188,13 +191,13 @@ def mock_delete_message_batch(**kwargs):
188
191
# Test that only filtered messages are deleted
189
192
delete_entries = [{'Id' : x ['id' ], 'ReceiptHandle' : 100 + x ['id' ]} for x in matching ]
190
193
calls_delete_message_batch = [
191
- mock .call ().delete_message_batch (QueueUrl = 'test' , Entries = delete_entries )
194
+ mock .call ().delete_message_batch (QueueUrl = QUEUE_URL , Entries = delete_entries )
192
195
]
193
196
mock_conn .assert_has_calls (calls_delete_message_batch )
194
197
195
198
@mock .patch .object (SQSHook , "get_conn" )
196
199
def test_poke_message_filtering_jsonpath (self , mock_conn ):
197
- self .sqs_hook .create_queue ('test' )
200
+ self .sqs_hook .create_queue (QUEUE_NAME )
198
201
matching = [
199
202
{"id" : 11 , "key" : {"matches" : [1 , 2 ]}},
200
203
{"id" : 12 , "key" : {"matches" : [3 , 4 , 5 ]}},
@@ -234,13 +237,13 @@ def mock_delete_message_batch(**kwargs):
234
237
# Test that only filtered messages are deleted
235
238
delete_entries = [{'Id' : x ['id' ], 'ReceiptHandle' : 100 + x ['id' ]} for x in matching ]
236
239
calls_delete_message_batch = [
237
- mock .call ().delete_message_batch (QueueUrl = 'test' , Entries = delete_entries )
240
+ mock .call ().delete_message_batch (QueueUrl = QUEUE_URL , Entries = delete_entries )
238
241
]
239
242
mock_conn .assert_has_calls (calls_delete_message_batch )
240
243
241
244
@mock .patch .object (SQSHook , "get_conn" )
242
245
def test_poke_message_filtering_jsonpath_values (self , mock_conn ):
243
- self .sqs_hook .create_queue ('test' )
246
+ self .sqs_hook .create_queue (QUEUE_NAME )
244
247
matching = [
245
248
{"id" : 11 , "key" : {"matches" : [1 , 2 ]}},
246
249
{"id" : 12 , "key" : {"matches" : [1 , 4 , 5 ]}},
@@ -282,6 +285,6 @@ def mock_delete_message_batch(**kwargs):
282
285
# Test that only filtered messages are deleted
283
286
delete_entries = [{'Id' : x ['id' ], 'ReceiptHandle' : 100 + x ['id' ]} for x in matching ]
284
287
calls_delete_message_batch = [
285
- mock .call ().delete_message_batch (QueueUrl = 'test' , Entries = delete_entries )
288
+ mock .call ().delete_message_batch (QueueUrl = 'https:// test-queue ' , Entries = delete_entries )
286
289
]
287
290
mock_conn .assert_has_calls (calls_delete_message_batch )
0 commit comments