@@ -67,6 +67,61 @@ def test_passing_arguments_to_hook(self, mock_hook):
67
67
project_id = TEST_PROJECT_ID , dataset_id = TEST_DATASET_ID , table_id = TEST_TABLE_ID
68
68
)
69
69
70
+ def test_execute_defered (self ):
71
+ """
72
+ Asserts that a task is deferred and a BigQueryTableExistenceTrigger will be fired
73
+ when the BigQueryTableExistenceAsyncSensor is executed.
74
+ """
75
+ task = BigQueryTableExistenceSensor (
76
+ task_id = "check_table_exists" ,
77
+ project_id = TEST_PROJECT_ID ,
78
+ dataset_id = TEST_DATASET_ID ,
79
+ table_id = TEST_TABLE_ID ,
80
+ deferrable = True ,
81
+ )
82
+ with pytest .raises (TaskDeferred ) as exc :
83
+ task .execute (context = {})
84
+ assert isinstance (
85
+ exc .value .trigger , BigQueryTableExistenceTrigger
86
+ ), "Trigger is not a BigQueryTableExistenceTrigger"
87
+
88
+ def test_excute_defered_failure (self ):
89
+ """Tests that an AirflowException is raised in case of error event"""
90
+ task = BigQueryTableExistenceSensor (
91
+ task_id = "task-id" ,
92
+ project_id = TEST_PROJECT_ID ,
93
+ dataset_id = TEST_DATASET_ID ,
94
+ table_id = TEST_TABLE_ID ,
95
+ deferrable = True ,
96
+ )
97
+ with pytest .raises (AirflowException ):
98
+ task .execute_complete (context = {}, event = {"status" : "error" , "message" : "test failure message" })
99
+
100
+ def test_execute_complete (self ):
101
+ """Asserts that logging occurs as expected"""
102
+ task = BigQueryTableExistenceSensor (
103
+ task_id = "task-id" ,
104
+ project_id = TEST_PROJECT_ID ,
105
+ dataset_id = TEST_DATASET_ID ,
106
+ table_id = TEST_TABLE_ID ,
107
+ deferrable = True ,
108
+ )
109
+ table_uri = f"{ TEST_PROJECT_ID } :{ TEST_DATASET_ID } .{ TEST_TABLE_ID } "
110
+ with mock .patch .object (task .log , "info" ) as mock_log_info :
111
+ task .execute_complete (context = {}, event = {"status" : "success" , "message" : "Job completed" })
112
+ mock_log_info .assert_called_with ("Sensor checks existence of table: %s" , table_uri )
113
+
114
+ def test_execute_defered_complete_event_none (self ):
115
+ """Asserts that logging occurs as expected"""
116
+ task = BigQueryTableExistenceSensor (
117
+ task_id = "task-id" ,
118
+ project_id = TEST_PROJECT_ID ,
119
+ dataset_id = TEST_DATASET_ID ,
120
+ table_id = TEST_TABLE_ID ,
121
+ )
122
+ with pytest .raises (AirflowException ):
123
+ task .execute_complete (context = {}, event = None )
124
+
70
125
71
126
class TestBigqueryTablePartitionExistenceSensor :
72
127
@mock .patch ("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook" )
@@ -171,17 +226,25 @@ def context():
171
226
172
227
173
228
class TestBigQueryTableExistenceAsyncSensor :
229
+ depcrecation_message = (
230
+ "Class `BigQueryTableExistenceAsyncSensor` is deprecated and "
231
+ "will be removed in a future release. "
232
+ "Please use `BigQueryTableExistenceSensor` and "
233
+ "set `deferrable` attribute to `True` instead"
234
+ )
235
+
174
236
def test_big_query_table_existence_sensor_async (self ):
175
237
"""
176
238
Asserts that a task is deferred and a BigQueryTableExistenceTrigger will be fired
177
239
when the BigQueryTableExistenceAsyncSensor is executed.
178
240
"""
179
- task = BigQueryTableExistenceAsyncSensor (
180
- task_id = "check_table_exists" ,
181
- project_id = TEST_PROJECT_ID ,
182
- dataset_id = TEST_DATASET_ID ,
183
- table_id = TEST_TABLE_ID ,
184
- )
241
+ with pytest .warns (DeprecationWarning , match = self .depcrecation_message ):
242
+ task = BigQueryTableExistenceAsyncSensor (
243
+ task_id = "check_table_exists" ,
244
+ project_id = TEST_PROJECT_ID ,
245
+ dataset_id = TEST_DATASET_ID ,
246
+ table_id = TEST_TABLE_ID ,
247
+ )
185
248
with pytest .raises (TaskDeferred ) as exc :
186
249
task .execute (context = {})
187
250
assert isinstance (
@@ -190,36 +253,39 @@ def test_big_query_table_existence_sensor_async(self):
190
253
191
254
def test_big_query_table_existence_sensor_async_execute_failure (self ):
192
255
"""Tests that an AirflowException is raised in case of error event"""
193
- task = BigQueryTableExistenceAsyncSensor (
194
- task_id = "task-id" ,
195
- project_id = TEST_PROJECT_ID ,
196
- dataset_id = TEST_DATASET_ID ,
197
- table_id = TEST_TABLE_ID ,
198
- )
256
+ with pytest .warns (DeprecationWarning , match = self .depcrecation_message ):
257
+ task = BigQueryTableExistenceAsyncSensor (
258
+ task_id = "task-id" ,
259
+ project_id = TEST_PROJECT_ID ,
260
+ dataset_id = TEST_DATASET_ID ,
261
+ table_id = TEST_TABLE_ID ,
262
+ )
199
263
with pytest .raises (AirflowException ):
200
264
task .execute_complete (context = {}, event = {"status" : "error" , "message" : "test failure message" })
201
265
202
266
def test_big_query_table_existence_sensor_async_execute_complete (self ):
203
267
"""Asserts that logging occurs as expected"""
204
- task = BigQueryTableExistenceAsyncSensor (
205
- task_id = "task-id" ,
206
- project_id = TEST_PROJECT_ID ,
207
- dataset_id = TEST_DATASET_ID ,
208
- table_id = TEST_TABLE_ID ,
209
- )
268
+ with pytest .warns (DeprecationWarning , match = self .depcrecation_message ):
269
+ task = BigQueryTableExistenceAsyncSensor (
270
+ task_id = "task-id" ,
271
+ project_id = TEST_PROJECT_ID ,
272
+ dataset_id = TEST_DATASET_ID ,
273
+ table_id = TEST_TABLE_ID ,
274
+ )
210
275
table_uri = f"{ TEST_PROJECT_ID } :{ TEST_DATASET_ID } .{ TEST_TABLE_ID } "
211
276
with mock .patch .object (task .log , "info" ) as mock_log_info :
212
277
task .execute_complete (context = {}, event = {"status" : "success" , "message" : "Job completed" })
213
278
mock_log_info .assert_called_with ("Sensor checks existence of table: %s" , table_uri )
214
279
215
280
def test_big_query_sensor_async_execute_complete_event_none (self ):
216
281
"""Asserts that logging occurs as expected"""
217
- task = BigQueryTableExistenceAsyncSensor (
218
- task_id = "task-id" ,
219
- project_id = TEST_PROJECT_ID ,
220
- dataset_id = TEST_DATASET_ID ,
221
- table_id = TEST_TABLE_ID ,
222
- )
282
+ with pytest .warns (DeprecationWarning , match = self .depcrecation_message ):
283
+ task = BigQueryTableExistenceAsyncSensor (
284
+ task_id = "task-id" ,
285
+ project_id = TEST_PROJECT_ID ,
286
+ dataset_id = TEST_DATASET_ID ,
287
+ table_id = TEST_TABLE_ID ,
288
+ )
223
289
with pytest .raises (AirflowException ):
224
290
task .execute_complete (context = {}, event = None )
225
291
0 commit comments