Skip to content

Commit

Permalink
remove delegate_to from GCP operators and hooks (#30748)
Browse files Browse the repository at this point in the history
  • Loading branch information
shahar1 committed Apr 20, 2023
1 parent afdc954 commit fbc1382
Show file tree
Hide file tree
Showing 209 changed files with 691 additions and 2,556 deletions.
10 changes: 10 additions & 0 deletions airflow/providers/amazon/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@
Changelog
---------

8.0.0
......

Breaking changes
~~~~~~~~~~~~~~~~

.. warning::
In this version of the provider, deprecated GCS hook's parameter ``delegate_to`` is removed from the following operators: ``GCSToS3Operator``, ``GlacierToGCSOperator`` and ``GoogleApiToS3Operator``.
Impersonation can be achieved instead by utilizing the ``impersonation_chain`` param.

7.4.1
.....

Expand Down
6 changes: 0 additions & 6 deletions airflow/providers/amazon/aws/transfers/gcs_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ class GCSToS3Operator(BaseOperator):
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
:param google_cloud_storage_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
:param delegate_to: Google account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled.
:param dest_aws_conn_id: The destination S3 connection
:param dest_s3_key: The base S3 key to be used to store the files. (templated)
:param dest_verify: Whether or not to verify SSL certificates for S3 connection.
Expand Down Expand Up @@ -101,7 +98,6 @@ def __init__(
delimiter: str | None = None,
gcp_conn_id: str = "google_cloud_default",
google_cloud_storage_conn_id: str | None = None,
delegate_to: str | None = None,
dest_aws_conn_id: str = "aws_default",
dest_s3_key: str,
dest_verify: str | bool | None = None,
Expand All @@ -127,7 +123,6 @@ def __init__(
self.prefix = prefix
self.delimiter = delimiter
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.dest_aws_conn_id = dest_aws_conn_id
self.dest_s3_key = dest_s3_key
self.dest_verify = dest_verify
Expand All @@ -141,7 +136,6 @@ def execute(self, context: Context) -> list[str]:
# list all files in an Google Cloud Storage bucket
hook = GCSHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.google_impersonation_chain,
)

Expand Down
8 changes: 1 addition & 7 deletions airflow/providers/amazon/aws/transfers/glacier_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ class GlacierToGCSOperator(BaseOperator):
:param object_name: the name of the object to check in the Google cloud
storage bucket.
:param gzip: option to compress local file or file data for upload
:param chunk_size: size of chunk in bytes the that will downloaded from Glacier vault
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled.
:param chunk_size: size of chunk in bytes the that will be downloaded from Glacier vault
:param google_impersonation_chain: Optional Google service account to impersonate using
short-term credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
Expand All @@ -73,7 +70,6 @@ def __init__(
object_name: str,
gzip: bool,
chunk_size: int = 1024,
delegate_to: str | None = None,
google_impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
Expand All @@ -85,14 +81,12 @@ def __init__(
self.object_name = object_name
self.gzip = gzip
self.chunk_size = chunk_size
self.delegate_to = delegate_to
self.impersonation_chain = google_impersonation_chain

def execute(self, context: Context) -> str:
glacier_hook = GlacierHook(aws_conn_id=self.aws_conn_id)
gcs_hook = GCSHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
job_id = glacier_hook.retrieve_inventory(vault_name=self.vault_name)
Expand Down
8 changes: 1 addition & 7 deletions airflow/providers/amazon/aws/transfers/google_api_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,10 @@ class GoogleApiToS3Operator(BaseOperator):
.. note:: This means the response will be a list of responses.
:param google_api_num_retries: Define the number of retries for the google api requests being made
:param google_api_num_retries: Define the number of retries for the Google API requests being made
if it fails.
:param s3_overwrite: Specifies whether the s3 file will be overwritten if exists.
:param gcp_conn_id: The connection ID to use when fetching connection info.
:param delegate_to: Google account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled.
:param aws_conn_id: The connection id specifying the authentication information for the S3 Bucket.
:param google_impersonation_chain: Optional Google service account to impersonate using
short-term credentials, or chained list of accounts required to get the access_token
Expand Down Expand Up @@ -115,7 +112,6 @@ def __init__(
google_api_num_retries: int = 0,
s3_overwrite: bool = False,
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
aws_conn_id: str = "aws_default",
google_impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
Expand All @@ -133,7 +129,6 @@ def __init__(
self.google_api_num_retries = google_api_num_retries
self.s3_overwrite = s3_overwrite
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.aws_conn_id = aws_conn_id
self.google_impersonation_chain = google_impersonation_chain

Expand All @@ -158,7 +153,6 @@ def execute(self, context: Context) -> None:
def _retrieve_data_from_google_api(self) -> dict:
google_discovery_api_hook = GoogleDiscoveryApiHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
api_service_name=self.google_api_service_name,
api_version=self.google_api_service_version,
impersonation_chain=self.google_impersonation_chain,
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ description: |
suspended: false
versions:
- 8.0.0
- 7.4.1
- 7.4.0
- 7.3.0
Expand Down
10 changes: 10 additions & 0 deletions airflow/providers/apache/beam/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@
Changelog
---------

5.0.0
......

Breaking changes
~~~~~~~~~~~~~~~~

.. warning::
In this version of the provider, deprecated GCS and Dataflow hooks' param ``delegate_to`` is removed from all Beam operators.
Impersonation can be achieved instead by utilizing the ``impersonation_chain`` param.

4.3.0
.....

Expand Down
25 changes: 3 additions & 22 deletions airflow/providers/apache/beam/operators/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import os
import stat
import tempfile
import warnings
from abc import ABC, ABCMeta, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import ExitStack
Expand Down Expand Up @@ -57,7 +56,6 @@ class BeamDataflowMixin(metaclass=ABCMeta):
dataflow_hook: DataflowHook | None
dataflow_config: DataflowConfiguration
gcp_conn_id: str
delegate_to: str | None
dataflow_support_impersonation: bool = True

def _set_dataflow(
Expand All @@ -77,7 +75,6 @@ def _set_dataflow(
def __set_dataflow_hook(self) -> DataflowHook:
self.dataflow_hook = DataflowHook(
gcp_conn_id=self.dataflow_config.gcp_conn_id or self.gcp_conn_id,
delegate_to=self.dataflow_config.delegate_to or self.delegate_to,
poll_sleep=self.dataflow_config.poll_sleep,
impersonation_chain=self.dataflow_config.impersonation_chain,
drain_pipeline=self.dataflow_config.drain_pipeline,
Expand Down Expand Up @@ -146,10 +143,6 @@ class BeamBasePipelineOperator(BaseOperator, BeamDataflowMixin, ABC):
When defining labels (labels option), you can also provide a dictionary.
:param gcp_conn_id: Optional.
The connection ID to use connecting to Google Cloud Storage if python file is on GCS.
:param delegate_to: Optional.
The account to impersonate using domain-wide delegation of authority,
if any. For this to work, the service account making the request must have
domain-wide delegation enabled.
:param dataflow_config: Dataflow configuration, used when runner type is set to DataflowRunner,
(optional) defaults to None.
"""
Expand All @@ -161,7 +154,6 @@ def __init__(
default_pipeline_options: dict | None = None,
pipeline_options: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
dataflow_config: DataflowConfiguration | dict | None = None,
**kwargs,
) -> None:
Expand All @@ -170,11 +162,6 @@ def __init__(
self.default_pipeline_options = default_pipeline_options or {}
self.pipeline_options = pipeline_options or {}
self.gcp_conn_id = gcp_conn_id
if delegate_to:
warnings.warn(
"'delegate_to' parameter is deprecated, please use 'impersonation_chain'", DeprecationWarning
)
self.delegate_to = delegate_to
if isinstance(dataflow_config, dict):
self.dataflow_config = DataflowConfiguration(**dataflow_config)
else:
Expand Down Expand Up @@ -273,7 +260,6 @@ def __init__(
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
dataflow_config: DataflowConfiguration | dict | None = None,
**kwargs,
) -> None:
Expand All @@ -282,7 +268,6 @@ def __init__(
default_pipeline_options=default_pipeline_options,
pipeline_options=pipeline_options,
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to,
dataflow_config=dataflow_config,
**kwargs,
)
Expand Down Expand Up @@ -310,7 +295,7 @@ def execute(self, context: Context):

with ExitStack() as exit_stack:
if self.py_file.lower().startswith("gs://"):
gcs_hook = GCSHook(self.gcp_conn_id, self.delegate_to)
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
tmp_gcs_file = exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
self.py_file = tmp_gcs_file.name

Expand Down Expand Up @@ -411,7 +396,6 @@ def __init__(
default_pipeline_options: dict | None = None,
pipeline_options: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
dataflow_config: DataflowConfiguration | dict | None = None,
**kwargs,
) -> None:
Expand All @@ -420,7 +404,6 @@ def __init__(
default_pipeline_options=default_pipeline_options,
pipeline_options=pipeline_options,
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to,
dataflow_config=dataflow_config,
**kwargs,
)
Expand All @@ -441,7 +424,7 @@ def execute(self, context: Context):

with ExitStack() as exit_stack:
if self.jar.lower().startswith("gs://"):
gcs_hook = GCSHook(self.gcp_conn_id, self.delegate_to)
gcs_hook = GCSHook(self.gcp_conn_id)
tmp_gcs_file = exit_stack.enter_context(gcs_hook.provide_file(object_url=self.jar))
self.jar = tmp_gcs_file.name

Expand Down Expand Up @@ -568,7 +551,6 @@ def __init__(
default_pipeline_options: dict | None = None,
pipeline_options: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
dataflow_config: DataflowConfiguration | dict | None = None,
**kwargs,
) -> None:
Expand All @@ -577,7 +559,6 @@ def __init__(
default_pipeline_options=default_pipeline_options,
pipeline_options=pipeline_options,
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to,
dataflow_config=dataflow_config,
**kwargs,
)
Expand Down Expand Up @@ -620,7 +601,7 @@ def execute(self, context: Context):

with ExitStack() as exit_stack:
if go_artifact.is_located_on_gcs():
gcs_hook = GCSHook(self.gcp_conn_id, self.delegate_to)
gcs_hook = GCSHook(self.gcp_conn_id)
tmp_dir = exit_stack.enter_context(tempfile.TemporaryDirectory(prefix="apache-beam-go"))
go_artifact.download_from_gcs(gcs_hook=gcs_hook, tmp_dir=tmp_dir)

Expand Down
1 change: 1 addition & 0 deletions airflow/providers/apache/beam/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ description: |
suspended: false
versions:
- 5.0.0
- 4.3.0
- 4.2.0
- 4.1.1
Expand Down
5 changes: 5 additions & 0 deletions airflow/providers/google/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ Google has announced sunset of Campaign Manager 360 v3.5 by Apr 20, 2023. For mo
please check: `<https://developers.google.com/doubleclick-advertisers/deprecation>`_ . As a result, the
default api version for Campaign Manager 360 operator was updated to the latest v4 version.

.. warning::
In this version of the provider, deprecated ``delegate_to`` param is removed from all GCP operators, hooks, and triggers, as well as from firestore and gsuite
transfer operators that interact with GCS. Impersonation can be achieved instead by utilizing the ``impersonation_chain`` param.
The ``delegate_to`` param will still be available only in gsuite and marketing platform hooks and operators, that don't interact with Google Cloud.

9.0.0
.....

Expand Down
11 changes: 5 additions & 6 deletions airflow/providers/google/cloud/hooks/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"""
from __future__ import annotations

import warnings
from typing import Sequence

from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
Expand Down Expand Up @@ -65,16 +64,16 @@ class CloudAutoMLHook(GoogleBaseHook):
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
if delegate_to:
warnings.warn(
"'delegate_to' parameter is deprecated, please use 'impersonation_chain'", DeprecationWarning
if kwargs.get("delegate_to") is not None:
raise RuntimeError(
"The `delegate_to` parameter has been deprecated before and finally removed in this version"
" of Google Provider. You MUST convert it to `impersonate_chain`"
)
super().__init__(
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
self._client: AutoMlClient | None = None
Expand Down
11 changes: 5 additions & 6 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
Interact with BigQuery. This hook uses the Google Cloud connection.
:param gcp_conn_id: The Airflow connection used for GCP credentials.
:param delegate_to: This performs a task on one host with reference to other hosts.
:param use_legacy_sql: This specifies whether to use legacy SQL dialect.
:param location: The location of the BigQuery resource.
:param api_resource_configs: This contains params configuration applied for Google BigQuery jobs.
Expand All @@ -91,20 +90,20 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
def __init__(
self,
gcp_conn_id: str = GoogleBaseHook.default_conn_name,
delegate_to: str | None = None,
use_legacy_sql: bool = True,
location: str | None = None,
api_resource_configs: dict | None = None,
impersonation_chain: str | Sequence[str] | None = None,
labels: dict | None = None,
**kwargs,
) -> None:
if delegate_to:
warnings.warn(
"'delegate_to' parameter is deprecated, please use 'impersonation_chain'", DeprecationWarning
if kwargs.get("delegate_to") is not None:
raise RuntimeError(
"The `delegate_to` parameter has been deprecated before and finally removed in this version"
" of Google Provider. You MUST convert it to `impersonate_chain`"
)
super().__init__(
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
self.use_legacy_sql = use_legacy_sql
Expand Down

1 comment on commit fbc1382

@HammadASiddiqui
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,

can someone guide me on this discussion googleapis/google-api-python-client#2228

Please sign in to comment.