Skip to content

Commit

Permalink
Change retry type for Google Dataflow Client to async one (#36141)
Browse files Browse the repository at this point in the history
Google Dataflow Client 0.8.6 implemented bugfix where retry type
was changed to async. This caused our canary builds to fail.

We change the client to Async now and bump min version of the
client to 0.8.6.
  • Loading branch information
potiuk committed Dec 9, 2023
1 parent 7b6cadd commit 8d0c5d9
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 45 deletions.
3 changes: 2 additions & 1 deletion airflow/providers/google/cloud/hooks/bigquery_dts.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

if TYPE_CHECKING:
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from googleapiclient.discovery import Resource


Expand Down Expand Up @@ -321,7 +322,7 @@ async def get_transfer_run(
run_id: str,
project_id: str | None,
location: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
):
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/google/cloud/hooks/cloud_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource

# Time to sleep between active checks of the operation results
Expand Down Expand Up @@ -645,7 +646,7 @@ async def get_cloud_build(
self,
id_: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
location: str = "global",
Expand Down
7 changes: 4 additions & 3 deletions airflow/providers/google/cloud/hooks/cloud_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from google.api_core.operation import Operation
from google.api_core.operation_async import AsyncOperation
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.cloud.orchestration.airflow.service_v1.services.environments.pagers import (
ListEnvironmentsPager,
)
Expand Down Expand Up @@ -332,7 +333,7 @@ async def create_environment(
project_id: str,
region: str,
environment: Environment | dict,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
Expand Down Expand Up @@ -361,7 +362,7 @@ async def delete_environment(
project_id: str,
region: str,
environment_id: str,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
Expand Down Expand Up @@ -389,7 +390,7 @@ async def update_environment(
environment_id: str,
environment: Environment | dict,
update_mask: dict | FieldMask,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/google/cloud/hooks/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from googleapiclient.discovery import Resource

PATH_DATA_SCAN = "projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
Expand Down Expand Up @@ -896,7 +897,7 @@ async def get_data_scan_job(
region: str,
data_scan_id: str | None = None,
job_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Any:
Expand Down
37 changes: 19 additions & 18 deletions airflow/providers/google/cloud/hooks/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from google.api_core.operation_async import AsyncOperation
from google.api_core.operations_v1.operations_client import OperationsClient
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.protobuf.duration_pb2 import Duration
from google.protobuf.field_mask_pb2 import FieldMask

Expand Down Expand Up @@ -256,7 +257,7 @@ def wait_for_operation(
self,
operation: Operation,
timeout: float | None = None,
result_retry: Retry | _MethodDefault = DEFAULT,
result_retry: AsyncRetry | _MethodDefault = DEFAULT,
) -> Any:
"""Wait for a long-lasting operation to complete."""
try:
Expand Down Expand Up @@ -997,7 +998,7 @@ def wait_for_batch(
region: str,
project_id: str,
wait_check_interval: int = 10,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Batch:
Expand Down Expand Up @@ -1132,7 +1133,7 @@ async def create_cluster(
virtual_cluster_config: dict | None = None,
labels: dict[str, str] | None = None,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
Expand Down Expand Up @@ -1199,7 +1200,7 @@ async def delete_cluster(
project_id: str,
cluster_uuid: str | None = None,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
Expand Down Expand Up @@ -1242,7 +1243,7 @@ async def diagnose_cluster(
region: str,
cluster_name: str,
project_id: str,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> str:
Expand Down Expand Up @@ -1277,7 +1278,7 @@ async def get_cluster(
region: str,
cluster_name: str,
project_id: str,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Cluster:
Expand Down Expand Up @@ -1309,7 +1310,7 @@ async def list_clusters(
filter_: str,
project_id: str,
page_size: int | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
):
Expand Down Expand Up @@ -1349,7 +1350,7 @@ async def update_cluster(
region: str,
graceful_decommission_timeout: dict | Duration | None = None,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
Expand Down Expand Up @@ -1429,7 +1430,7 @@ async def create_workflow_template(
template: dict | WorkflowTemplate,
project_id: str,
region: str,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> WorkflowTemplate:
Expand Down Expand Up @@ -1465,7 +1466,7 @@ async def instantiate_workflow_template(
version: int | None = None,
request_id: str | None = None,
parameters: dict[str, str] | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
Expand Down Expand Up @@ -1511,7 +1512,7 @@ async def instantiate_inline_workflow_template(
project_id: str,
region: str,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
Expand Down Expand Up @@ -1554,7 +1555,7 @@ async def get_job(
job_id: str,
project_id: str,
region: str,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Job:
Expand Down Expand Up @@ -1588,7 +1589,7 @@ async def submit_job(
project_id: str,
region: str,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Job:
Expand Down Expand Up @@ -1624,7 +1625,7 @@ async def cancel_job(
job_id: str,
project_id: str,
region: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Job:
Expand Down Expand Up @@ -1658,7 +1659,7 @@ async def create_batch(
batch: dict | Batch,
batch_id: str | None = None,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AsyncOperation:
Expand Down Expand Up @@ -1703,7 +1704,7 @@ async def delete_batch(
batch_id: str,
region: str,
project_id: str,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> None:
Expand Down Expand Up @@ -1737,7 +1738,7 @@ async def get_batch(
batch_id: str,
region: str,
project_id: str,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Batch:
Expand Down Expand Up @@ -1773,7 +1774,7 @@ async def list_batches(
project_id: str,
page_size: int | None = None,
page_token: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
filter: str | None = None,
Expand Down
11 changes: 6 additions & 5 deletions airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

if TYPE_CHECKING:
from google.api_core import operation
from google.api_core.retry_async import AsyncRetry
from google.protobuf.duration_pb2 import Duration
from google.protobuf.field_mask_pb2 import FieldMask

Expand Down Expand Up @@ -592,7 +593,7 @@ def __init__(
request_id: str | None = None,
delete_on_error: bool = True,
use_if_exists: bool = True,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float = 1 * 60 * 60,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
Expand Down Expand Up @@ -985,7 +986,7 @@ def __init__(
project_id: str | None = None,
cluster_uuid: str | None = None,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float = 1 * 60 * 60,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
Expand Down Expand Up @@ -1891,7 +1892,7 @@ def __init__(
version: int | None = None,
request_id: str | None = None,
parameters: dict[str, str] | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
Expand Down Expand Up @@ -2341,7 +2342,7 @@ def __init__(
region: str,
request_id: str | None = None,
project_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
Expand Down Expand Up @@ -2481,7 +2482,7 @@ def __init__(
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
result_retry: Retry | _MethodDefault = DEFAULT,
result_retry: AsyncRetry | _MethodDefault = DEFAULT,
asynchronous: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
polling_interval_seconds: int = 5,
Expand Down
14 changes: 7 additions & 7 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,25 @@ dependencies:
- google-auth>=1.0.0
- google-auth-httplib2>=0.0.1
- google-cloud-aiplatform>=1.22.1
- google-cloud-automl>=2.11.0
- google-cloud-bigquery-datatransfer>=3.11.0
- google-cloud-automl>=2.12.0
- google-cloud-bigquery-datatransfer>=3.13.0
- google-cloud-bigtable>=2.17.0
- google-cloud-build>=3.13.0
- google-cloud-build>=3.22.0
- google-cloud-compute>=1.10.0
- google-cloud-container>=2.17.4
- google-cloud-datacatalog>=3.11.1
- google-cloud-dataflow-client>=0.8.2
- google-cloud-dataflow-client>=0.8.6
- google-cloud-dataform>=0.5.0
- google-cloud-dataplex>=1.4.2
- google-cloud-dataproc>=5.5.0
- google-cloud-dataplex>=1.10.0
- google-cloud-dataproc>=5.8.0
- google-cloud-dataproc-metastore>=1.12.0
- google-cloud-dlp>=3.12.0
- google-cloud-kms>=2.15.0
- google-cloud-language>=2.9.0
- google-cloud-logging>=3.5.0
- google-cloud-memcache>=1.7.0
- google-cloud-monitoring>=2.14.1
- google-cloud-orchestration-airflow>=1.7.0
- google-cloud-orchestration-airflow>=1.10.0
- google-cloud-os-login>=2.9.1
- google-cloud-pubsub>=2.15.0
- google-cloud-redis>=2.12.0
Expand Down
14 changes: 7 additions & 7 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -422,26 +422,26 @@
"google-auth-httplib2>=0.0.1",
"google-auth>=1.0.0",
"google-cloud-aiplatform>=1.22.1",
"google-cloud-automl>=2.11.0",
"google-cloud-automl>=2.12.0",
"google-cloud-batch>=0.13.0",
"google-cloud-bigquery-datatransfer>=3.11.0",
"google-cloud-bigquery-datatransfer>=3.13.0",
"google-cloud-bigtable>=2.17.0",
"google-cloud-build>=3.13.0",
"google-cloud-build>=3.22.0",
"google-cloud-compute>=1.10.0",
"google-cloud-container>=2.17.4",
"google-cloud-datacatalog>=3.11.1",
"google-cloud-dataflow-client>=0.8.2",
"google-cloud-dataflow-client>=0.8.6",
"google-cloud-dataform>=0.5.0",
"google-cloud-dataplex>=1.4.2",
"google-cloud-dataplex>=1.10.0",
"google-cloud-dataproc-metastore>=1.12.0",
"google-cloud-dataproc>=5.5.0",
"google-cloud-dataproc>=5.8.0",
"google-cloud-dlp>=3.12.0",
"google-cloud-kms>=2.15.0",
"google-cloud-language>=2.9.0",
"google-cloud-logging>=3.5.0",
"google-cloud-memcache>=1.7.0",
"google-cloud-monitoring>=2.14.1",
"google-cloud-orchestration-airflow>=1.7.0",
"google-cloud-orchestration-airflow>=1.10.0",
"google-cloud-os-login>=2.9.1",
"google-cloud-pubsub>=2.15.0",
"google-cloud-redis>=2.12.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import os
from datetime import datetime

from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.dataproc import (
Expand Down Expand Up @@ -75,7 +75,7 @@
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_2,
result_retry=Retry(maximum=10.0, initial=10.0, multiplier=1.0),
result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
)

create_batch_3 = DataprocCreateBatchOperator(
Expand Down

0 comments on commit 8d0c5d9

Please sign in to comment.