Skip to content

Commit

Permalink
[AIRFLOW-6730] Use total_seconds instead of seconds (#7363)
Browse files Browse the repository at this point in the history
* [AIRFLOW-6730] Use total_seconds instead of seconds

* adds tests and fixes types issue

* fix test
  • Loading branch information
saguziel committed Feb 28, 2020
1 parent ee16d30 commit 008b4ba
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 9 deletions.
2 changes: 1 addition & 1 deletion airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def is_alive(self, grace_multiplier=2.1):
"""
return (
self.state == State.RUNNING and
(timezone.utcnow() - self.latest_heartbeat).seconds < self.heartrate * grace_multiplier
(timezone.utcnow() - self.latest_heartbeat).total_seconds() < self.heartrate * grace_multiplier
)

@provide_session
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ def is_alive(self, grace_multiplier=None):
scheduler_health_check_threshold = conf.getint('scheduler', 'scheduler_health_check_threshold')
return (
self.state == State.RUNNING and
(timezone.utcnow() - self.latest_heartbeat).seconds < scheduler_health_check_threshold
(timezone.utcnow() - self.latest_heartbeat).total_seconds() < scheduler_health_check_threshold
)

@provide_session
Expand Down
2 changes: 1 addition & 1 deletion airflow/kubernetes/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def run_pod(
if resp.status.start_time is None:
while self.pod_not_started(pod):
delta = dt.now() - curr_time
if delta.seconds >= startup_timeout:
if delta.total_seconds() >= startup_timeout:
raise AirflowException("Pod took too long to start")
time.sleep(1)
self.log.debug('Pod not yet started')
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def _get_init_action_timeout(self):
return self.init_action_timeout
elif match.group(2) == "m":
val = float(match.group(1))
return "{}s".format(timedelta(minutes=val).seconds)
return "{}s".format(int(timedelta(minutes=val).total_seconds()))

raise AirflowException(
"DataprocClusterCreateOperator init_action_timeout"
Expand Down Expand Up @@ -621,13 +621,13 @@ def _graceful_decommission_timeout_object(self) -> Optional[Dict]:
timeout = int(match.group(1))
elif match.group(2) == "m":
val = float(match.group(1))
timeout = timedelta(minutes=val).seconds
timeout = int(timedelta(minutes=val).total_seconds())
elif match.group(2) == "h":
val = float(match.group(1))
timeout = timedelta(hours=val).seconds
timeout = int(timedelta(hours=val).total_seconds())
elif match.group(2) == "d":
val = float(match.group(1))
timeout = timedelta(days=val).seconds
timeout = int(timedelta(days=val).total_seconds())

if not timeout:
raise AirflowException(
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/operators/postgres_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ def convert_type(self, value, schema_type):
return time.mktime(value.timetuple())
if isinstance(value, datetime.time):
formated_time = time.strptime(str(value), "%H:%M:%S")
return datetime.timedelta(
return int(datetime.timedelta(
hours=formated_time.tm_hour,
minutes=formated_time.tm_min,
seconds=formated_time.tm_sec).seconds
seconds=formated_time.tm_sec).total_seconds())
if isinstance(value, dict):
return json.dumps(value)
if isinstance(value, Decimal):
Expand Down
5 changes: 5 additions & 0 deletions tests/jobs/test_base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ def test_is_alive(self):
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=21)
self.assertFalse(job.is_alive())

# test because .seconds was used before instead of total_seconds
# internal repr of datetime is (days, seconds)
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(days=1)
self.assertFalse(job.is_alive())

job.state = State.SUCCESS
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
self.assertFalse(job.is_alive(), "Completed jobs even with recent heartbeat should not be alive")
Expand Down
5 changes: 5 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,11 @@ def test_is_alive(self):
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=31)
self.assertFalse(job.is_alive())

# test because .seconds was used before instead of total_seconds
# internal repr of datetime is (days, seconds)
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(days=1)
self.assertFalse(job.is_alive())

job.state = State.SUCCESS
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
self.assertFalse(job.is_alive(), "Completed jobs even with recent heartbeat should not be alive")
Expand Down

0 comments on commit 008b4ba

Please sign in to comment.