Skip to content

Commit

Permalink
Refactor: consolidate import time in providers (#34402)
Browse files Browse the repository at this point in the history
  • Loading branch information
eumiro committed Oct 4, 2023
1 parent 99eeb84 commit 99f3203
Show file tree
Hide file tree
Showing 18 changed files with 44 additions and 44 deletions.
4 changes: 2 additions & 2 deletions airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
Expand Up @@ -17,8 +17,8 @@
# under the License.
from __future__ import annotations

import time
from functools import cached_property
from time import sleep
from typing import TYPE_CHECKING, Any, Sequence

from deprecated.classic import deprecated
Expand Down Expand Up @@ -78,7 +78,7 @@ def poll_for_termination(self, app_id: str) -> None:
state = self.hook.get_spark_state(app_id)
while AppState(state) not in AnalyticDBSparkHook.TERMINAL_STATES:
self.log.debug("Application with id %s is in state: %s", app_id, state)
sleep(self.polling_interval)
time.sleep(self.polling_interval)
state = self.hook.get_spark_state(app_id)
self.log.info("Application with id %s terminated with state: %s", app_id, state)
self.log.info(
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/hooks/batch_client.py
Expand Up @@ -28,7 +28,7 @@

import itertools
import random
from time import sleep
import time
from typing import TYPE_CHECKING, Callable

import botocore.client
Expand Down Expand Up @@ -549,7 +549,7 @@ def delay(delay: int | float | None = None) -> None:
delay = random.uniform(BatchClientHook.DEFAULT_DELAY_MIN, BatchClientHook.DEFAULT_DELAY_MAX)
else:
delay = BatchClientHook.add_jitter(delay)
sleep(delay)
time.sleep(delay)

@staticmethod
def exponential_delay(tries: int) -> float:
Expand Down
Expand Up @@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations

from time import sleep
import time

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
Expand Down Expand Up @@ -160,7 +160,7 @@ def wait_for_availability(

self.log.info("Poke retry %s. Sleep time %s seconds. Sleeping...", num_tries, sleep_time)

sleep(sleep_time)
time.sleep(sleep_time)

sleep_time *= exponential_back_off_factor

Expand Down Expand Up @@ -240,7 +240,7 @@ def wait_for_deletion(

self.log.info("Poke retry %s. Sleep time %s seconds. Sleeping...", num_tries, sleep_time)

sleep(sleep_time)
time.sleep(sleep_time)

sleep_time *= exponential_back_off_factor

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/hooks/emr.py
Expand Up @@ -18,8 +18,8 @@
from __future__ import annotations

import json
import time
import warnings
from time import sleep
from typing import Any

from botocore.exceptions import ClientError
Expand Down Expand Up @@ -509,7 +509,7 @@ def poll_query_status(
final_query_state = query_state
break
try_number += 1
sleep(poll_interval)
time.sleep(poll_interval)
return final_query_state

def stop_query(self, job_id: str) -> dict:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/hooks/redshift_data.py
Expand Up @@ -17,8 +17,8 @@
# under the License.
from __future__ import annotations

import time
from pprint import pformat
from time import sleep
from typing import TYPE_CHECKING, Any, Iterable

from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
Expand Down Expand Up @@ -127,7 +127,7 @@ def wait_for_results(self, statement_id, poll_interval):
)
else:
self.log.info("Query %s", status)
sleep(poll_interval)
time.sleep(poll_interval)

def get_table_primary_key(
self,
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/hooks/s3.py
Expand Up @@ -26,6 +26,7 @@
import os
import re
import shutil
import time
import warnings
from contextlib import suppress
from copy import deepcopy
Expand All @@ -35,7 +36,6 @@
from io import BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile, gettempdir
from time import sleep
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
from urllib.parse import urlsplit
from uuid import uuid4
Expand Down Expand Up @@ -1289,7 +1289,7 @@ def delete_bucket(self, bucket_name: str, force_delete: bool = False, max_retrie
if not bucket_keys:
break
if retry: # Avoid first loop
sleep(500)
time.sleep(500)

self.delete_objects(bucket=bucket_name, keys=bucket_keys)

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/operators/appflow.py
Expand Up @@ -16,10 +16,10 @@
# under the License.
from __future__ import annotations

import time
import warnings
from datetime import datetime, timedelta
from functools import cached_property
from time import sleep
from typing import TYPE_CHECKING, cast

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
Expand Down Expand Up @@ -107,7 +107,7 @@ def execute(self, context: Context) -> None:
self._update_flow()
# while schedule flows will pick up the update right away, on-demand flows might use out of date
# info if triggered right after an update, so we need to wait a bit for the DB to be consistent.
sleep(AppflowBaseOperator.UPDATE_PROPAGATION_TIME)
time.sleep(AppflowBaseOperator.UPDATE_PROPAGATION_TIME)

self._run_flow(context)

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/apache/livy/operators/livy.py
Expand Up @@ -17,8 +17,8 @@
"""This module contains the Apache Livy operator."""
from __future__ import annotations

import time
from functools import cached_property
from time import sleep
from typing import TYPE_CHECKING, Any, Sequence

from deprecated.classic import deprecated
Expand Down Expand Up @@ -189,7 +189,7 @@ def poll_for_termination(self, batch_id: int | str) -> None:
state = self.hook.get_batch_state(batch_id, retry_args=self.retry_args)
while state not in self.hook.TERMINAL_STATES:
self.log.debug("Batch with id %s is in state: %s", batch_id, state.value)
sleep(self._polling_interval)
time.sleep(self._polling_interval)
state = self.hook.get_batch_state(batch_id, retry_args=self.retry_args)
self.log.info("Batch with id %s terminated with state: %s", batch_id, state.value)
self.hook.dump_batch_logs(batch_id)
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Expand Up @@ -21,10 +21,10 @@
import inspect
import logging
import sys
import time
import warnings
from collections import defaultdict
from operator import attrgetter
from time import time
from typing import TYPE_CHECKING, Any, Callable, List, Tuple
from urllib.parse import quote, urlparse

Expand Down Expand Up @@ -372,7 +372,7 @@ def es_read(self, log_id: str, offset: int | str, metadata: dict) -> list | Elas

def emit(self, record):
if self.handler:
setattr(record, self.offset_field, int(time() * (10**9)))
setattr(record, self.offset_field, int(time.time() * (10**9)))
self.handler.emit(record)

def set_context(self, ti: TaskInstance) -> None:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/hooks/cloud_batch.py
Expand Up @@ -19,7 +19,7 @@

import itertools
import json
from time import sleep
import time
from typing import TYPE_CHECKING, Iterable, Sequence

from google.cloud.batch import ListJobsRequest, ListTasksRequest
Expand Down Expand Up @@ -152,7 +152,7 @@ def wait_for_job(
):
return job
else:
sleep(polling_period_seconds)
time.sleep(polling_period_seconds)
except Exception as e:
self.log.exception("Exception occurred while checking for job completion.")
raise e
Expand Down
12 changes: 6 additions & 6 deletions airflow/providers/google/cloud/hooks/datafusion.py
Expand Up @@ -20,7 +20,7 @@
import asyncio
import json
import os
from time import monotonic, sleep
import time
from typing import Any, Dict, Sequence
from urllib.parse import quote, urlencode, urljoin

Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(
def wait_for_operation(self, operation: dict[str, Any]) -> dict[str, Any]:
"""Waits for long-lasting operation to complete."""
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
sleep(time_to_wait)
time.sleep(time_to_wait)
operation = (
self.get_conn().projects().locations().operations().get(name=operation.get("name")).execute()
)
Expand All @@ -115,9 +115,9 @@ def wait_for_pipeline_state(
"""Polls pipeline state and raises an exception if the state fails or times out."""
failure_states = failure_states or FAILURE_STATES
success_states = success_states or SUCCESS_STATES
start_time = monotonic()
start_time = time.monotonic()
current_state = None
while monotonic() - start_time < timeout:
while time.monotonic() - start_time < timeout:
try:
workflow = self.get_pipeline_workflow(
pipeline_name=pipeline_name,
Expand All @@ -135,7 +135,7 @@ def wait_for_pipeline_state(
raise AirflowException(
f"Pipeline {pipeline_name} state {current_state} is not one of {success_states}"
)
sleep(30)
time.sleep(30)

# Time is up!
raise AirflowException(
Expand Down Expand Up @@ -393,7 +393,7 @@ def delete_pipeline(
)
except ConflictException as exc:
self.log.info(exc)
sleep(time_to_wait)
time.sleep(time_to_wait)
else:
if response.status == 200:
break
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/operators/datafusion.py
Expand Up @@ -17,7 +17,7 @@
"""This module contains Google DataFusion operators."""
from __future__ import annotations

from time import sleep
import time
from typing import TYPE_CHECKING, Any, Sequence

from google.api_core.retry import exponential_sleep_generator
Expand Down Expand Up @@ -267,7 +267,7 @@ def execute(self, context: Context) -> dict:
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
if instance["state"] != "CREATING":
break
sleep(time_to_wait)
time.sleep(time_to_wait)
instance = hook.get_instance(
instance_name=self.instance_name, location=self.location, project_id=self.project_id
)
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/operators/dataplex.py
Expand Up @@ -18,7 +18,7 @@

from __future__ import annotations

from time import sleep
import time
from typing import TYPE_CHECKING, Any, Sequence

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -165,7 +165,7 @@ def execute(self, context: Context) -> dict:
)
if task["state"] != "CREATING":
break
sleep(time_to_wait)
time.sleep(time_to_wait)

return Task.to_dict(task)

Expand Down Expand Up @@ -534,7 +534,7 @@ def execute(self, context: Context) -> dict:
)
if lake["state"] != "CREATING":
break
sleep(time_to_wait)
time.sleep(time_to_wait)
DataplexLakeLink.persist(
context=context,
task_instance=self,
Expand Down
Expand Up @@ -18,7 +18,7 @@
"""This module contains Google Dataproc Metastore operators."""
from __future__ import annotations

from time import sleep
import time
from typing import TYPE_CHECKING, Sequence

from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
Expand Down Expand Up @@ -700,7 +700,7 @@ def _wait_for_export_metadata(self, hook: DataprocMetastoreHook):
the SDK.
"""
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
sleep(time_to_wait)
time.sleep(time_to_wait)
service = hook.get_service(
region=self.region,
project_id=self.project_id,
Expand Down Expand Up @@ -986,7 +986,7 @@ def _wait_for_restore_service(self, hook: DataprocMetastoreHook):
the SDK.
"""
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
sleep(time_to_wait)
time.sleep(time_to_wait)
service = hook.get_service(
region=self.region,
project_id=self.project_id,
Expand Down
Expand Up @@ -18,8 +18,8 @@
from __future__ import annotations

import re
import time
from collections import namedtuple
from time import sleep
from typing import TYPE_CHECKING, Any, Sequence

from azure.mgmt.containerinstance.models import (
Expand Down Expand Up @@ -348,7 +348,7 @@ def _monitor_logging(self, resource_group: str, name: str) -> int:
except Exception:
self.log.exception("Exception while getting container groups")

sleep(1)
time.sleep(1)

def _log_last(self, logs: list | None, last_line_logged: Any) -> Any | None:
if logs:
Expand Down
8 changes: 4 additions & 4 deletions tests/providers/amazon/aws/hooks/test_batch_client.py
Expand Up @@ -427,7 +427,7 @@ def test_add_jitter(self):
assert result <= width

@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
def test_delay_defaults(self, mock_sleep, mock_uniform):
assert BatchClientHook.DEFAULT_DELAY_MIN == 1
assert BatchClientHook.DEFAULT_DELAY_MAX == 10
Expand All @@ -439,21 +439,21 @@ def test_delay_defaults(self, mock_sleep, mock_uniform):
mock_sleep.assert_called_once_with(0)

@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
def test_delay_with_zero(self, mock_sleep, mock_uniform):
self.batch_client.delay(0)
mock_uniform.assert_called_once_with(0, 1) # in add_jitter
mock_sleep.assert_called_once_with(mock_uniform.return_value)

@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
def test_delay_with_int(self, mock_sleep, mock_uniform):
self.batch_client.delay(5)
mock_uniform.assert_called_once_with(4, 6) # in add_jitter
mock_sleep.assert_called_once_with(mock_uniform.return_value)

@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.random.uniform")
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.sleep")
@mock.patch("airflow.providers.amazon.aws.hooks.batch_client.time.sleep")
def test_delay_with_float(self, mock_sleep, mock_uniform):
self.batch_client.delay(5.0)
mock_uniform.assert_called_once_with(4.0, 6.0) # in add_jitter
Expand Down
Expand Up @@ -345,7 +345,7 @@ def test_execute_fails_with_incorrect_restart_policy(self, aci_mock):
)

@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.AzureContainerInstanceHook")
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.sleep")
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.time.sleep")
def test_execute_correct_sleep_cycle(self, sleep_mock, aci_mock):
expected_cg1 = make_mock_container(state="Running", exit_code=0, detail_status="test")
expected_cg2 = make_mock_container(state="Terminated", exit_code=0, detail_status="test")
Expand Down

0 comments on commit 99f3203

Please sign in to comment.