|
23 | 23 | import pytest
|
24 | 24 | from google.api_core.exceptions import AlreadyExists, NotFound
|
25 | 25 | from google.api_core.retry import Retry
|
26 |
| -from google.cloud.dataproc_v1 import Batch |
| 26 | +from google.cloud.dataproc_v1 import Batch, JobStatus |
27 | 27 |
|
28 | 28 | from airflow.exceptions import (
|
29 | 29 | AirflowException,
|
@@ -1058,6 +1058,32 @@ def test_execute_deferrable(self, mock_trigger_hook, mock_hook):
|
1058 | 1058 | assert isinstance(exc.value.trigger, DataprocSubmitTrigger)
|
1059 | 1059 | assert exc.value.method_name == GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
|
1060 | 1060 |
|
| 1061 | + @mock.patch(DATAPROC_PATH.format("DataprocHook")) |
| 1062 | + @mock.patch("airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator.defer") |
| 1063 | + @mock.patch("airflow.providers.google.cloud.operators.dataproc.DataprocHook.submit_job") |
| 1064 | + def test_dataproc_operator_execute_async_done_before_defer(self, mock_submit_job, mock_defer, mock_hook): |
| 1065 | + mock_submit_job.return_value.reference.job_id = TEST_JOB_ID |
| 1066 | + job_status = mock_hook.return_value.get_job.return_value.status |
| 1067 | + job_status.state = JobStatus.State.DONE |
| 1068 | + |
| 1069 | + op = DataprocSubmitJobOperator( |
| 1070 | + task_id=TASK_ID, |
| 1071 | + region=GCP_REGION, |
| 1072 | + project_id=GCP_PROJECT, |
| 1073 | + job={}, |
| 1074 | + gcp_conn_id=GCP_CONN_ID, |
| 1075 | + retry=RETRY, |
| 1076 | + asynchronous=True, |
| 1077 | + timeout=TIMEOUT, |
| 1078 | + metadata=METADATA, |
| 1079 | + request_id=REQUEST_ID, |
| 1080 | + impersonation_chain=IMPERSONATION_CHAIN, |
| 1081 | + deferrable=True, |
| 1082 | + ) |
| 1083 | + |
| 1084 | + op.execute(context=self.mock_context) |
| 1085 | + assert not mock_defer.called |
| 1086 | + |
1061 | 1087 | @mock.patch(DATAPROC_PATH.format("DataprocHook"))
|
1062 | 1088 | def test_on_kill(self, mock_hook):
|
1063 | 1089 | job = {}
|
|
0 commit comments