Skip to content

Commit

Permalink
Add credential configuration file support to Google Cloud Hook (#31548)
Browse files Browse the repository at this point in the history
Add support to authenticate to GCP using a credential configuration file explicitly defined in a connection. This allows Airflow users to authenticate to GCP using external accounts without relying on the ADC mechanism, allowing the configuration of multiple connections utilizing this mechanism, which offers more flexibility than service account keys.
  • Loading branch information
pgagnon committed May 26, 2023
1 parent e16319b commit ef40148
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 2 deletions.
31 changes: 30 additions & 1 deletion airflow/providers/google/cloud/utils/credentials_provider.py
Expand Up @@ -23,6 +23,7 @@

import json
import logging
import os.path
import tempfile
from contextlib import ExitStack, contextmanager
from typing import Collection, Generator, Sequence
Expand Down Expand Up @@ -196,6 +197,7 @@ def __init__(
self,
key_path: str | None = None,
keyfile_dict: dict[str, str] | None = None,
credential_config_file: dict[str, str] | str | None = None,
key_secret_name: str | None = None,
key_secret_project_id: str | None = None,
scopes: Collection[str] | None = None,
Expand All @@ -213,6 +215,7 @@ def __init__(
)
self.key_path = key_path
self.keyfile_dict = keyfile_dict
self.credential_config_file = credential_config_file
self.key_secret_name = key_secret_name
self.key_secret_project_id = key_secret_project_id
self.scopes = scopes
Expand All @@ -233,6 +236,8 @@ def get_credentials_and_project(self) -> tuple[google.auth.credentials.Credentia
credentials, project_id = self._get_credentials_using_key_secret_name()
elif self.keyfile_dict:
credentials, project_id = self._get_credentials_using_keyfile_dict()
elif self.credential_config_file:
credentials, project_id = self._get_credentials_using_credential_config_file()
else:
credentials, project_id = self._get_credentials_using_adc()

Expand Down Expand Up @@ -311,9 +316,33 @@ def _get_credentials_using_key_secret_name(self):
project_id = credentials.project_id
return credentials, project_id

def _get_credentials_using_credential_config_file(self):
if isinstance(self.credential_config_file, str) and os.path.exists(self.credential_config_file):
self._log_info(
f"Getting connection using credential configuration file: `{self.credential_config_file}`"
)
credentials, project_id = google.auth.load_credentials_from_file(
self.credential_config_file, scopes=self.scopes
)
else:
with tempfile.NamedTemporaryFile(mode="w+t") as temp_credentials_fd:
if isinstance(self.credential_config_file, dict):
self._log_info("Getting connection using credential configuration dict.")
temp_credentials_fd.write(json.dumps(self.credential_config_file))
elif isinstance(self.credential_config_file, str):
self._log_info("Getting connection using credential configuration string.")
temp_credentials_fd.write(self.credential_config_file)

temp_credentials_fd.flush()
credentials, project_id = google.auth.load_credentials_from_file(
temp_credentials_fd.name, scopes=self.scopes
)

return credentials, project_id

def _get_credentials_using_adc(self):
self._log_info(
"Getting connection using `google.auth.default()` since no key file is defined for hook."
"Getting connection using `google.auth.default()` since no explicit credentials are provided."
)
credentials, project_id = google.auth.default(scopes=self.scopes)
return credentials, project_id
Expand Down
7 changes: 7 additions & 0 deletions airflow/providers/google/common/hooks/base_google.py
Expand Up @@ -198,6 +198,9 @@ def get_connection_form_widgets() -> dict[str, Any]:
"project": StringField(lazy_gettext("Project Id"), widget=BS3TextFieldWidget()),
"key_path": StringField(lazy_gettext("Keyfile Path"), widget=BS3TextFieldWidget()),
"keyfile_dict": PasswordField(lazy_gettext("Keyfile JSON"), widget=BS3PasswordFieldWidget()),
"credential_config_file": StringField(
lazy_gettext("Credential Configuration File"), widget=BS3TextFieldWidget()
),
"scope": StringField(lazy_gettext("Scopes (comma separated)"), widget=BS3TextFieldWidget()),
"key_secret_name": StringField(
lazy_gettext("Keyfile Secret Name (in GCP Secret Manager)"), widget=BS3TextFieldWidget()
Expand Down Expand Up @@ -251,14 +254,18 @@ def get_credentials_and_project_id(self) -> tuple[google.auth.credentials.Creden
keyfile_dict_json = json.loads(keyfile_dict)
except json.decoder.JSONDecodeError:
raise AirflowException("Invalid key JSON.")

key_secret_name: str | None = self._get_field("key_secret_name", None)
key_secret_project_id: str | None = self._get_field("key_secret_project_id", None)

credential_config_file: str | None = self._get_field("credential_config_file", None)

target_principal, delegates = _get_target_principal_and_delegates(self.impersonation_chain)

credentials, project_id = get_credentials_and_project_id(
key_path=key_path,
keyfile_dict=keyfile_dict_json,
credential_config_file=credential_config_file,
key_secret_name=key_secret_name,
key_secret_project_id=key_secret_project_id,
scopes=self.scopes,
Expand Down
10 changes: 10 additions & 0 deletions docs/apache-airflow-providers-google/connections/gcp.rst
Expand Up @@ -36,6 +36,11 @@ There are two ways to connect to Google Cloud using Airflow.
Key can be specified as a path to the key file (``Keyfile Path``), as a key payload (``Keyfile JSON``)
or as secret in Secret Manager (``Keyfile secret name``). Only one way of defining the key can be used at a time.
If you need to manage multiple keys then you should configure multiple connections.
3. Using a `credential configuration file <https://googleapis.dev/python/google-auth/2.9.0/user-guide.html#external-credentials-workload-identity-federation>`_,
by specifying the path to or the content of a valid credential configuration file.
A credential configuration file is a configuration file that typically contains non-sensitive metadata to instruct
the ``google-auth`` library on how to retrieve external subject tokens and exchange them for service account access
tokens.

.. warning:: Additional permissions might be needed

Expand Down Expand Up @@ -98,6 +103,11 @@ Keyfile JSON

Not required if using application default credentials.

Credential Configuration File
Credential configuration file JSON or path to a credential configuration file on the filesystem.

Not required if using application default credentials.

Secret name which holds Keyfile JSON
Name of the secret in Secret Manager which contains a `service account
<https://cloud.google.com/docs/authentication/#service_accounts>`_ key.
Expand Down
43 changes: 42 additions & 1 deletion tests/providers/google/cloud/utils/test_credentials_provider.py
Expand Up @@ -21,12 +21,14 @@
import os
import re
from io import StringIO
from tempfile import NamedTemporaryFile
from unittest import mock
from unittest.mock import ANY
from uuid import uuid4

import pytest
from google.auth.environment_vars import CREDENTIALS
from google.auth.exceptions import DefaultCredentialsError

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.utils.credentials_provider import (
Expand Down Expand Up @@ -141,7 +143,7 @@ def test_get_credentials_and_project_id_with_default_auth(self, mock_auth_defaul
mock_auth_default.assert_called_once_with(scopes=None)
assert ("CREDENTIALS", "PROJECT_ID") == result
assert (
"Getting connection using `google.auth.default()` since no key file is defined for hook."
"Getting connection using `google.auth.default()` since no explicit credentials are provided."
) in caplog.messages

@mock.patch("google.auth.default")
Expand Down Expand Up @@ -268,6 +270,45 @@ def test_get_credentials_and_project_id_with_service_account_info(
assert (mock_from_service_account_info.return_value, self.test_project_id) == result
assert "Getting connection using JSON Dict" in caplog.messages

@mock.patch("google.auth.load_credentials_from_file", return_value=("CREDENTIALS", "PROJECT_ID"))
def test_get_credentials_using_credential_config_file(self, mock_load_credentials_from_file, caplog):
with caplog.at_level(
level=logging.DEBUG, logger=CRED_PROVIDER_LOGGER_NAME
), NamedTemporaryFile() as temp_file:
caplog.clear()
result = get_credentials_and_project_id(credential_config_file=temp_file.name)
mock_load_credentials_from_file.assert_called_once_with(temp_file.name, scopes=None)
assert mock_load_credentials_from_file.return_value == result
assert (
f"Getting connection using credential configuration file: `{temp_file.name}`" in caplog.messages
)

@mock.patch("google.auth.load_credentials_from_file", return_value=("CREDENTIALS", "PROJECT_ID"))
def test_get_credentials_using_credential_config_dict(self, mock_load_credentials_from_file, caplog):
with caplog.at_level(level=logging.DEBUG, logger=CRED_PROVIDER_LOGGER_NAME):
caplog.clear()
result = get_credentials_and_project_id(credential_config_file={"type": "external_account"})
mock_load_credentials_from_file.assert_called_once()
assert mock_load_credentials_from_file.return_value == result
assert "Getting connection using credential configuration dict." in caplog.messages

@mock.patch("google.auth.load_credentials_from_file", return_value=("CREDENTIALS", "PROJECT_ID"))
def test_get_credentials_using_credential_config_string(self, mock_load_credentials_from_file, caplog):
with caplog.at_level(level=logging.DEBUG, logger=CRED_PROVIDER_LOGGER_NAME):
caplog.clear()
result = get_credentials_and_project_id(credential_config_file='{"type": "external_account"}')
mock_load_credentials_from_file.assert_called_once()
assert mock_load_credentials_from_file.return_value == result
assert "Getting connection using credential configuration string." in caplog.messages

def test_get_credentials_using_credential_config_invalid_string(self, caplog):
with pytest.raises(DefaultCredentialsError), caplog.at_level(
level=logging.DEBUG, logger=CRED_PROVIDER_LOGGER_NAME
):
caplog.clear()
get_credentials_and_project_id(credential_config_file="invalid json}}}}")
assert "Getting connection using credential configuration string." in caplog.messages

@mock.patch("google.auth.default", return_value=("CREDENTIALS", "PROJECT_ID"))
@mock.patch("google.oauth2.service_account.Credentials.from_service_account_info")
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider._SecretManagerClient")
Expand Down
6 changes: 6 additions & 0 deletions tests/providers/google/common/hooks/test_base_google.py
Expand Up @@ -363,6 +363,7 @@ def test_get_credentials_and_project_id_with_default_auth(self, mock_get_creds_a
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
keyfile_dict=None,
credential_config_file=None,
key_secret_name=None,
key_secret_project_id=None,
scopes=self.instance.scopes,
Expand Down Expand Up @@ -399,6 +400,7 @@ def test_get_credentials_and_project_id_with_service_account_file(self, mock_get
mock_get_creds_and_proj_id.assert_called_once_with(
key_path="KEY_PATH.json",
keyfile_dict=None,
credential_config_file=None,
key_secret_name=None,
key_secret_project_id=None,
scopes=self.instance.scopes,
Expand Down Expand Up @@ -428,6 +430,7 @@ def test_get_credentials_and_project_id_with_service_account_info(self, mock_get
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
keyfile_dict=service_account,
credential_config_file=None,
key_secret_name=None,
key_secret_project_id=None,
scopes=self.instance.scopes,
Expand All @@ -447,6 +450,7 @@ def test_get_credentials_and_project_id_with_default_auth_and_delegate(self, moc
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
keyfile_dict=None,
credential_config_file=None,
key_secret_name=None,
key_secret_project_id=None,
scopes=self.instance.scopes,
Expand Down Expand Up @@ -482,6 +486,7 @@ def test_get_credentials_and_project_id_with_default_auth_and_overridden_project
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
keyfile_dict=None,
credential_config_file=None,
key_secret_name=None,
key_secret_project_id=None,
scopes=self.instance.scopes,
Expand Down Expand Up @@ -683,6 +688,7 @@ def test_get_credentials_and_project_id_with_impersonation_chain(
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
keyfile_dict=None,
credential_config_file=None,
key_secret_name=None,
key_secret_project_id=None,
scopes=self.instance.scopes,
Expand Down

0 comments on commit ef40148

Please sign in to comment.