Skip to content

Commit 540a076

Browse files
Lee-Wuranusjr
andauthored
merge BigQueryTableExistenceAsyncSensor into BigQueryTableExistenceSensor (#30235)
* feat(providers/google): move the async execution logic from BigQueryTableExistenceAsyncSensor to BigQueryTableExistenceSensor * docs(providers/google): update the doc for BigQueryTableExistenceSensor deferrable mode and BigQueryTableExistenceAsyncSensor deprecation * test(providers/google): add test cases for BigQueryTableExistenceSensor when its deferrable attribute is set to True * refactor(providers/google): deprecate poll_interval argument --------- Co-authored-by: Tzu-ping Chung <[email protected]>
1 parent f08296d commit 540a076

File tree

4 files changed

+164
-67
lines changed

4 files changed

+164
-67
lines changed

airflow/providers/google/cloud/sensors/bigquery.py

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,22 @@ def __init__(
7575
gcp_conn_id: str = "google_cloud_default",
7676
delegate_to: str | None = None,
7777
impersonation_chain: str | Sequence[str] | None = None,
78+
deferrable: bool = False,
7879
**kwargs,
7980
) -> None:
81+
if deferrable and "poke_interval" not in kwargs:
82+
# TODO: Remove once deprecated
83+
if "polling_interval" in kwargs:
84+
kwargs["poke_interval"] = kwargs["polling_interval"]
85+
warnings.warn(
86+
"Argument `poll_interval` is deprecated and will be removed "
87+
"in a future release. Please use `poke_interval` instead.",
88+
DeprecationWarning,
89+
stacklevel=2,
90+
)
91+
else:
92+
kwargs["poke_interval"] = 5
93+
8094
super().__init__(**kwargs)
8195

8296
self.project_id = project_id
@@ -90,6 +104,8 @@ def __init__(
90104
self.delegate_to = delegate_to
91105
self.impersonation_chain = impersonation_chain
92106

107+
self.deferrable = deferrable
108+
93109
def poke(self, context: Context) -> bool:
94110
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
95111
self.log.info("Sensor checks existence of table: %s", table_uri)
@@ -102,6 +118,38 @@ def poke(self, context: Context) -> bool:
102118
project_id=self.project_id, dataset_id=self.dataset_id, table_id=self.table_id
103119
)
104120

121+
def execute(self, context: Context) -> None:
122+
"""Airflow runs this method on the worker and defers using the trigger."""
123+
self.defer(
124+
timeout=timedelta(seconds=self.timeout),
125+
trigger=BigQueryTableExistenceTrigger(
126+
dataset_id=self.dataset_id,
127+
table_id=self.table_id,
128+
project_id=self.project_id,
129+
poll_interval=self.poke_interval,
130+
gcp_conn_id=self.gcp_conn_id,
131+
hook_params={
132+
"delegate_to": self.delegate_to,
133+
"impersonation_chain": self.impersonation_chain,
134+
},
135+
),
136+
method_name="execute_complete",
137+
)
138+
139+
def execute_complete(self, context: dict[str, Any], event: dict[str, str] | None = None) -> str:
140+
"""
141+
Callback for when the trigger fires - returns immediately.
142+
Relies on trigger to throw an exception, otherwise it assumes execution was
143+
successful.
144+
"""
145+
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
146+
self.log.info("Sensor checks existence of table: %s", table_uri)
147+
if event:
148+
if event["status"] == "success":
149+
return event["message"]
150+
raise AirflowException(event["message"])
151+
raise AirflowException("No event received in trigger callback")
152+
105153

106154
class BigQueryTablePartitionExistenceSensor(BaseSensorOperator):
107155
"""
@@ -249,47 +297,15 @@ class BigQueryTableExistenceAsyncSensor(BigQueryTableExistenceSensor):
249297
:param polling_interval: The interval in seconds to wait between checks table existence.
250298
"""
251299

252-
def __init__(
253-
self,
254-
gcp_conn_id: str = "google_cloud_default",
255-
polling_interval: float = 5.0,
256-
**kwargs: Any,
257-
) -> None:
258-
super().__init__(**kwargs)
259-
self.polling_interval = polling_interval
260-
self.gcp_conn_id = gcp_conn_id
261-
262-
def execute(self, context: Context) -> None:
263-
"""Airflow runs this method on the worker and defers using the trigger."""
264-
self.defer(
265-
timeout=timedelta(seconds=self.timeout),
266-
trigger=BigQueryTableExistenceTrigger(
267-
dataset_id=self.dataset_id,
268-
table_id=self.table_id,
269-
project_id=self.project_id,
270-
poll_interval=self.polling_interval,
271-
gcp_conn_id=self.gcp_conn_id,
272-
hook_params={
273-
"delegate_to": self.delegate_to,
274-
"impersonation_chain": self.impersonation_chain,
275-
},
276-
),
277-
method_name="execute_complete",
300+
def __init__(self, **kwargs):
301+
warnings.warn(
302+
"Class `BigQueryTableExistenceAsyncSensor` is deprecated and "
303+
"will be removed in a future release. "
304+
"Please use `BigQueryTableExistenceSensor` and "
305+
"set `deferrable` attribute to `True` instead",
306+
DeprecationWarning,
278307
)
279-
280-
def execute_complete(self, context: dict[str, Any], event: dict[str, str] | None = None) -> str:
281-
"""
282-
Callback for when the trigger fires - returns immediately.
283-
Relies on trigger to throw an exception, otherwise it assumes execution was
284-
successful.
285-
"""
286-
table_uri = f"{self.project_id}:{self.dataset_id}.{self.table_id}"
287-
self.log.info("Sensor checks existence of table: %s", table_uri)
288-
if event:
289-
if event["status"] == "success":
290-
return event["message"]
291-
raise AirflowException(event["message"])
292-
raise AirflowException("No event received in trigger callback")
308+
super().__init__(deferrable=True, **kwargs)
293309

294310

295311
class BigQueryTableExistencePartitionAsyncSensor(BigQueryTablePartitionExistenceSensor):

docs/apache-airflow-providers-google/operators/cloud/bigquery.rst

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -484,10 +484,15 @@ use the ``{{ ds_nodash }}`` macro as the table name suffix.
484484
:start-after: [START howto_sensor_bigquery_table]
485485
:end-before: [END howto_sensor_bigquery_table]
486486

487-
Use the :class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTableExistenceAsyncSensor`
488-
(deferrable version) if you would like to free up the worker slots while the sensor is running.
487+
Also you can use deferrable mode in this operator if you would like to free up the worker slots while the sensor is running.
488+
489+
.. exampleinclude:: /../../tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
490+
:language: python
491+
:dedent: 4
492+
:start-after: [START howto_sensor_bigquery_table_defered]
493+
:end-before: [END howto_sensor_bigquery_table_defered]
489494

490-
:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTableExistenceAsyncSensor`.
495+
:class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTableExistenceAsyncSensor` is deprecated and will be removed in a future release. Please use :class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTableExistenceSensor` and use the deferrable mode in that operator.
491496

492497
.. exampleinclude:: /../../tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
493498
:language: python

tests/providers/google/cloud/sensors/test_bigquery.py

Lines changed: 90 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,61 @@ def test_passing_arguments_to_hook(self, mock_hook):
6767
project_id=TEST_PROJECT_ID, dataset_id=TEST_DATASET_ID, table_id=TEST_TABLE_ID
6868
)
6969

70+
def test_execute_defered(self):
71+
"""
72+
Asserts that a task is deferred and a BigQueryTableExistenceTrigger will be fired
73+
when the BigQueryTableExistenceAsyncSensor is executed.
74+
"""
75+
task = BigQueryTableExistenceSensor(
76+
task_id="check_table_exists",
77+
project_id=TEST_PROJECT_ID,
78+
dataset_id=TEST_DATASET_ID,
79+
table_id=TEST_TABLE_ID,
80+
deferrable=True,
81+
)
82+
with pytest.raises(TaskDeferred) as exc:
83+
task.execute(context={})
84+
assert isinstance(
85+
exc.value.trigger, BigQueryTableExistenceTrigger
86+
), "Trigger is not a BigQueryTableExistenceTrigger"
87+
88+
def test_excute_defered_failure(self):
89+
"""Tests that an AirflowException is raised in case of error event"""
90+
task = BigQueryTableExistenceSensor(
91+
task_id="task-id",
92+
project_id=TEST_PROJECT_ID,
93+
dataset_id=TEST_DATASET_ID,
94+
table_id=TEST_TABLE_ID,
95+
deferrable=True,
96+
)
97+
with pytest.raises(AirflowException):
98+
task.execute_complete(context={}, event={"status": "error", "message": "test failure message"})
99+
100+
def test_execute_complete(self):
101+
"""Asserts that logging occurs as expected"""
102+
task = BigQueryTableExistenceSensor(
103+
task_id="task-id",
104+
project_id=TEST_PROJECT_ID,
105+
dataset_id=TEST_DATASET_ID,
106+
table_id=TEST_TABLE_ID,
107+
deferrable=True,
108+
)
109+
table_uri = f"{TEST_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
110+
with mock.patch.object(task.log, "info") as mock_log_info:
111+
task.execute_complete(context={}, event={"status": "success", "message": "Job completed"})
112+
mock_log_info.assert_called_with("Sensor checks existence of table: %s", table_uri)
113+
114+
def test_execute_defered_complete_event_none(self):
115+
"""Asserts that logging occurs as expected"""
116+
task = BigQueryTableExistenceSensor(
117+
task_id="task-id",
118+
project_id=TEST_PROJECT_ID,
119+
dataset_id=TEST_DATASET_ID,
120+
table_id=TEST_TABLE_ID,
121+
)
122+
with pytest.raises(AirflowException):
123+
task.execute_complete(context={}, event=None)
124+
70125

71126
class TestBigqueryTablePartitionExistenceSensor:
72127
@mock.patch("airflow.providers.google.cloud.sensors.bigquery.BigQueryHook")
@@ -171,17 +226,25 @@ def context():
171226

172227

173228
class TestBigQueryTableExistenceAsyncSensor:
229+
depcrecation_message = (
230+
"Class `BigQueryTableExistenceAsyncSensor` is deprecated and "
231+
"will be removed in a future release. "
232+
"Please use `BigQueryTableExistenceSensor` and "
233+
"set `deferrable` attribute to `True` instead"
234+
)
235+
174236
def test_big_query_table_existence_sensor_async(self):
175237
"""
176238
Asserts that a task is deferred and a BigQueryTableExistenceTrigger will be fired
177239
when the BigQueryTableExistenceAsyncSensor is executed.
178240
"""
179-
task = BigQueryTableExistenceAsyncSensor(
180-
task_id="check_table_exists",
181-
project_id=TEST_PROJECT_ID,
182-
dataset_id=TEST_DATASET_ID,
183-
table_id=TEST_TABLE_ID,
184-
)
241+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
242+
task = BigQueryTableExistenceAsyncSensor(
243+
task_id="check_table_exists",
244+
project_id=TEST_PROJECT_ID,
245+
dataset_id=TEST_DATASET_ID,
246+
table_id=TEST_TABLE_ID,
247+
)
185248
with pytest.raises(TaskDeferred) as exc:
186249
task.execute(context={})
187250
assert isinstance(
@@ -190,36 +253,39 @@ def test_big_query_table_existence_sensor_async(self):
190253

191254
def test_big_query_table_existence_sensor_async_execute_failure(self):
192255
"""Tests that an AirflowException is raised in case of error event"""
193-
task = BigQueryTableExistenceAsyncSensor(
194-
task_id="task-id",
195-
project_id=TEST_PROJECT_ID,
196-
dataset_id=TEST_DATASET_ID,
197-
table_id=TEST_TABLE_ID,
198-
)
256+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
257+
task = BigQueryTableExistenceAsyncSensor(
258+
task_id="task-id",
259+
project_id=TEST_PROJECT_ID,
260+
dataset_id=TEST_DATASET_ID,
261+
table_id=TEST_TABLE_ID,
262+
)
199263
with pytest.raises(AirflowException):
200264
task.execute_complete(context={}, event={"status": "error", "message": "test failure message"})
201265

202266
def test_big_query_table_existence_sensor_async_execute_complete(self):
203267
"""Asserts that logging occurs as expected"""
204-
task = BigQueryTableExistenceAsyncSensor(
205-
task_id="task-id",
206-
project_id=TEST_PROJECT_ID,
207-
dataset_id=TEST_DATASET_ID,
208-
table_id=TEST_TABLE_ID,
209-
)
268+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
269+
task = BigQueryTableExistenceAsyncSensor(
270+
task_id="task-id",
271+
project_id=TEST_PROJECT_ID,
272+
dataset_id=TEST_DATASET_ID,
273+
table_id=TEST_TABLE_ID,
274+
)
210275
table_uri = f"{TEST_PROJECT_ID}:{TEST_DATASET_ID}.{TEST_TABLE_ID}"
211276
with mock.patch.object(task.log, "info") as mock_log_info:
212277
task.execute_complete(context={}, event={"status": "success", "message": "Job completed"})
213278
mock_log_info.assert_called_with("Sensor checks existence of table: %s", table_uri)
214279

215280
def test_big_query_sensor_async_execute_complete_event_none(self):
216281
"""Asserts that logging occurs as expected"""
217-
task = BigQueryTableExistenceAsyncSensor(
218-
task_id="task-id",
219-
project_id=TEST_PROJECT_ID,
220-
dataset_id=TEST_DATASET_ID,
221-
table_id=TEST_TABLE_ID,
222-
)
282+
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
283+
task = BigQueryTableExistenceAsyncSensor(
284+
task_id="task-id",
285+
project_id=TEST_PROJECT_ID,
286+
dataset_id=TEST_DATASET_ID,
287+
table_id=TEST_TABLE_ID,
288+
)
223289
with pytest.raises(AirflowException):
224290
task.execute_complete(context={}, event=None)
225291

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@
8989
)
9090
# [END howto_sensor_bigquery_table]
9191

92+
# [START howto_sensor_bigquery_table_defered]
93+
check_table_exists: BaseOperator = BigQueryTableExistenceSensor(
94+
task_id="check_table_exists_defered",
95+
project_id=PROJECT_ID,
96+
dataset_id=DATASET_NAME,
97+
table_id=TABLE_NAME,
98+
deferrable=True,
99+
)
100+
# [END howto_sensor_bigquery_table_defered]
101+
92102
# [START howto_sensor_async_bigquery_table]
93103
check_table_exists_async = BigQueryTableExistenceAsyncSensor(
94104
task_id="check_table_exists_async",

0 commit comments

Comments
 (0)