Skip to content

Commit

Permalink
Fix the logic of checking dataflow job state (#34785)
Browse files Browse the repository at this point in the history
Co-authored-by: Elad Kalif <[email protected]>
  • Loading branch information
champon1020 and eladkal committed Nov 10, 2023
1 parent 3cb0870 commit 8fd5ac6
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
4 changes: 3 additions & 1 deletion airflow/providers/google/cloud/hooks/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,9 @@ def _check_dataflow_job_state(self, job) -> bool:
"JOB_STATE_DRAINED while it is a batch job"
)

if not self._wait_until_finished and current_state == self._expected_terminal_state:
if current_state == self._expected_terminal_state:
if self._expected_terminal_state == DataflowJobStatus.JOB_STATE_RUNNING:
return not self._wait_until_finished
return True

if current_state in DataflowJobStatus.AWAITING_STATES:
Expand Down
4 changes: 4 additions & 0 deletions tests/providers/google/cloud/hooks/test_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,10 @@ def test_check_dataflow_job_state_wait_until_finished(
@pytest.mark.parametrize(
"job_state, wait_until_finished, expected_result",
[
# DONE
(DataflowJobStatus.JOB_STATE_DONE, None, True),
(DataflowJobStatus.JOB_STATE_DONE, True, True),
(DataflowJobStatus.JOB_STATE_DONE, False, True),
# RUNNING
(DataflowJobStatus.JOB_STATE_RUNNING, None, False),
(DataflowJobStatus.JOB_STATE_RUNNING, True, False),
Expand Down

0 comments on commit 8fd5ac6

Please sign in to comment.