Skip to content

Commit ef40148

Browse files
authored
Add credential configuration file support to Google Cloud Hook (#31548)
Add support to authenticate to GCP using a credential configuration file explicitly defined in a connection. This allows Airflow users to authenticate to GCP using external accounts without relying on the ADC mechanism, allowing the configuration of multiple connections utilizing this mechanism, which offers more flexibility than service account keys.
1 parent e16319b commit ef40148

File tree

5 files changed

+95
-2
lines changed

5 files changed

+95
-2
lines changed

airflow/providers/google/cloud/utils/credentials_provider.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import json
2525
import logging
26+
import os.path
2627
import tempfile
2728
from contextlib import ExitStack, contextmanager
2829
from typing import Collection, Generator, Sequence
@@ -196,6 +197,7 @@ def __init__(
196197
self,
197198
key_path: str | None = None,
198199
keyfile_dict: dict[str, str] | None = None,
200+
credential_config_file: dict[str, str] | str | None = None,
199201
key_secret_name: str | None = None,
200202
key_secret_project_id: str | None = None,
201203
scopes: Collection[str] | None = None,
@@ -213,6 +215,7 @@ def __init__(
213215
)
214216
self.key_path = key_path
215217
self.keyfile_dict = keyfile_dict
218+
self.credential_config_file = credential_config_file
216219
self.key_secret_name = key_secret_name
217220
self.key_secret_project_id = key_secret_project_id
218221
self.scopes = scopes
@@ -233,6 +236,8 @@ def get_credentials_and_project(self) -> tuple[google.auth.credentials.Credentia
233236
credentials, project_id = self._get_credentials_using_key_secret_name()
234237
elif self.keyfile_dict:
235238
credentials, project_id = self._get_credentials_using_keyfile_dict()
239+
elif self.credential_config_file:
240+
credentials, project_id = self._get_credentials_using_credential_config_file()
236241
else:
237242
credentials, project_id = self._get_credentials_using_adc()
238243

@@ -311,9 +316,33 @@ def _get_credentials_using_key_secret_name(self):
311316
project_id = credentials.project_id
312317
return credentials, project_id
313318

319+
def _get_credentials_using_credential_config_file(self):
320+
if isinstance(self.credential_config_file, str) and os.path.exists(self.credential_config_file):
321+
self._log_info(
322+
f"Getting connection using credential configuration file: `{self.credential_config_file}`"
323+
)
324+
credentials, project_id = google.auth.load_credentials_from_file(
325+
self.credential_config_file, scopes=self.scopes
326+
)
327+
else:
328+
with tempfile.NamedTemporaryFile(mode="w+t") as temp_credentials_fd:
329+
if isinstance(self.credential_config_file, dict):
330+
self._log_info("Getting connection using credential configuration dict.")
331+
temp_credentials_fd.write(json.dumps(self.credential_config_file))
332+
elif isinstance(self.credential_config_file, str):
333+
self._log_info("Getting connection using credential configuration string.")
334+
temp_credentials_fd.write(self.credential_config_file)
335+
336+
temp_credentials_fd.flush()
337+
credentials, project_id = google.auth.load_credentials_from_file(
338+
temp_credentials_fd.name, scopes=self.scopes
339+
)
340+
341+
return credentials, project_id
342+
314343
def _get_credentials_using_adc(self):
315344
self._log_info(
316-
"Getting connection using `google.auth.default()` since no key file is defined for hook."
345+
"Getting connection using `google.auth.default()` since no explicit credentials are provided."
317346
)
318347
credentials, project_id = google.auth.default(scopes=self.scopes)
319348
return credentials, project_id

airflow/providers/google/common/hooks/base_google.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ def get_connection_form_widgets() -> dict[str, Any]:
198198
"project": StringField(lazy_gettext("Project Id"), widget=BS3TextFieldWidget()),
199199
"key_path": StringField(lazy_gettext("Keyfile Path"), widget=BS3TextFieldWidget()),
200200
"keyfile_dict": PasswordField(lazy_gettext("Keyfile JSON"), widget=BS3PasswordFieldWidget()),
201+
"credential_config_file": StringField(
202+
lazy_gettext("Credential Configuration File"), widget=BS3TextFieldWidget()
203+
),
201204
"scope": StringField(lazy_gettext("Scopes (comma separated)"), widget=BS3TextFieldWidget()),
202205
"key_secret_name": StringField(
203206
lazy_gettext("Keyfile Secret Name (in GCP Secret Manager)"), widget=BS3TextFieldWidget()
@@ -251,14 +254,18 @@ def get_credentials_and_project_id(self) -> tuple[google.auth.credentials.Creden
251254
keyfile_dict_json = json.loads(keyfile_dict)
252255
except json.decoder.JSONDecodeError:
253256
raise AirflowException("Invalid key JSON.")
257+
254258
key_secret_name: str | None = self._get_field("key_secret_name", None)
255259
key_secret_project_id: str | None = self._get_field("key_secret_project_id", None)
256260

261+
credential_config_file: str | None = self._get_field("credential_config_file", None)
262+
257263
target_principal, delegates = _get_target_principal_and_delegates(self.impersonation_chain)
258264

259265
credentials, project_id = get_credentials_and_project_id(
260266
key_path=key_path,
261267
keyfile_dict=keyfile_dict_json,
268+
credential_config_file=credential_config_file,
262269
key_secret_name=key_secret_name,
263270
key_secret_project_id=key_secret_project_id,
264271
scopes=self.scopes,

docs/apache-airflow-providers-google/connections/gcp.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ There are two ways to connect to Google Cloud using Airflow.
3636
Key can be specified as a path to the key file (``Keyfile Path``), as a key payload (``Keyfile JSON``)
3737
or as secret in Secret Manager (``Keyfile secret name``). Only one way of defining the key can be used at a time.
3838
If you need to manage multiple keys then you should configure multiple connections.
39+
3. Using a `credential configuration file <https://googleapis.dev/python/google-auth/2.9.0/user-guide.html#external-credentials-workload-identity-federation>`_,
40+
by specifying the path to or the content of a valid credential configuration file.
41+
A credential configuration file is a configuration file that typically contains non-sensitive metadata to instruct
42+
the ``google-auth`` library on how to retrieve external subject tokens and exchange them for service account access
43+
tokens.
3944

4045
.. warning:: Additional permissions might be needed
4146

@@ -98,6 +103,11 @@ Keyfile JSON
98103

99104
Not required if using application default credentials.
100105

106+
Credential Configuration File
107+
Credential configuration file JSON or path to a credential configuration file on the filesystem.
108+
109+
Not required if using application default credentials.
110+
101111
Secret name which holds Keyfile JSON
102112
Name of the secret in Secret Manager which contains a `service account
103113
<https://cloud.google.com/docs/authentication/#service_accounts>`_ key.

tests/providers/google/cloud/utils/test_credentials_provider.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import os
2222
import re
2323
from io import StringIO
24+
from tempfile import NamedTemporaryFile
2425
from unittest import mock
2526
from unittest.mock import ANY
2627
from uuid import uuid4
2728

2829
import pytest
2930
from google.auth.environment_vars import CREDENTIALS
31+
from google.auth.exceptions import DefaultCredentialsError
3032

3133
from airflow.exceptions import AirflowException
3234
from airflow.providers.google.cloud.utils.credentials_provider import (
@@ -141,7 +143,7 @@ def test_get_credentials_and_project_id_with_default_auth(self, mock_auth_defaul
141143
mock_auth_default.assert_called_once_with(scopes=None)
142144
assert ("CREDENTIALS", "PROJECT_ID") == result
143145
assert (
144-
"Getting connection using `google.auth.default()` since no key file is defined for hook."
146+
"Getting connection using `google.auth.default()` since no explicit credentials are provided."
145147
) in caplog.messages
146148

147149
@mock.patch("google.auth.default")
@@ -268,6 +270,45 @@ def test_get_credentials_and_project_id_with_service_account_info(
268270
assert (mock_from_service_account_info.return_value, self.test_project_id) == result
269271
assert "Getting connection using JSON Dict" in caplog.messages
270272

273+
@mock.patch("google.auth.load_credentials_from_file", return_value=("CREDENTIALS", "PROJECT_ID"))
274+
def test_get_credentials_using_credential_config_file(self, mock_load_credentials_from_file, caplog):
275+
with caplog.at_level(
276+
level=logging.DEBUG, logger=CRED_PROVIDER_LOGGER_NAME
277+
), NamedTemporaryFile() as temp_file:
278+
caplog.clear()
279+
result = get_credentials_and_project_id(credential_config_file=temp_file.name)
280+
mock_load_credentials_from_file.assert_called_once_with(temp_file.name, scopes=None)
281+
assert mock_load_credentials_from_file.return_value == result
282+
assert (
283+
f"Getting connection using credential configuration file: `{temp_file.name}`" in caplog.messages
284+
)
285+
286+
@mock.patch("google.auth.load_credentials_from_file", return_value=("CREDENTIALS", "PROJECT_ID"))
287+
def test_get_credentials_using_credential_config_dict(self, mock_load_credentials_from_file, caplog):
288+
with caplog.at_level(level=logging.DEBUG, logger=CRED_PROVIDER_LOGGER_NAME):
289+
caplog.clear()
290+
result = get_credentials_and_project_id(credential_config_file={"type": "external_account"})
291+
mock_load_credentials_from_file.assert_called_once()
292+
assert mock_load_credentials_from_file.return_value == result
293+
assert "Getting connection using credential configuration dict." in caplog.messages
294+
295+
@mock.patch("google.auth.load_credentials_from_file", return_value=("CREDENTIALS", "PROJECT_ID"))
296+
def test_get_credentials_using_credential_config_string(self, mock_load_credentials_from_file, caplog):
297+
with caplog.at_level(level=logging.DEBUG, logger=CRED_PROVIDER_LOGGER_NAME):
298+
caplog.clear()
299+
result = get_credentials_and_project_id(credential_config_file='{"type": "external_account"}')
300+
mock_load_credentials_from_file.assert_called_once()
301+
assert mock_load_credentials_from_file.return_value == result
302+
assert "Getting connection using credential configuration string." in caplog.messages
303+
304+
def test_get_credentials_using_credential_config_invalid_string(self, caplog):
305+
with pytest.raises(DefaultCredentialsError), caplog.at_level(
306+
level=logging.DEBUG, logger=CRED_PROVIDER_LOGGER_NAME
307+
):
308+
caplog.clear()
309+
get_credentials_and_project_id(credential_config_file="invalid json}}}}")
310+
assert "Getting connection using credential configuration string." in caplog.messages
311+
271312
@mock.patch("google.auth.default", return_value=("CREDENTIALS", "PROJECT_ID"))
272313
@mock.patch("google.oauth2.service_account.Credentials.from_service_account_info")
273314
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider._SecretManagerClient")

tests/providers/google/common/hooks/test_base_google.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ def test_get_credentials_and_project_id_with_default_auth(self, mock_get_creds_a
363363
mock_get_creds_and_proj_id.assert_called_once_with(
364364
key_path=None,
365365
keyfile_dict=None,
366+
credential_config_file=None,
366367
key_secret_name=None,
367368
key_secret_project_id=None,
368369
scopes=self.instance.scopes,
@@ -399,6 +400,7 @@ def test_get_credentials_and_project_id_with_service_account_file(self, mock_get
399400
mock_get_creds_and_proj_id.assert_called_once_with(
400401
key_path="KEY_PATH.json",
401402
keyfile_dict=None,
403+
credential_config_file=None,
402404
key_secret_name=None,
403405
key_secret_project_id=None,
404406
scopes=self.instance.scopes,
@@ -428,6 +430,7 @@ def test_get_credentials_and_project_id_with_service_account_info(self, mock_get
428430
mock_get_creds_and_proj_id.assert_called_once_with(
429431
key_path=None,
430432
keyfile_dict=service_account,
433+
credential_config_file=None,
431434
key_secret_name=None,
432435
key_secret_project_id=None,
433436
scopes=self.instance.scopes,
@@ -447,6 +450,7 @@ def test_get_credentials_and_project_id_with_default_auth_and_delegate(self, moc
447450
mock_get_creds_and_proj_id.assert_called_once_with(
448451
key_path=None,
449452
keyfile_dict=None,
453+
credential_config_file=None,
450454
key_secret_name=None,
451455
key_secret_project_id=None,
452456
scopes=self.instance.scopes,
@@ -482,6 +486,7 @@ def test_get_credentials_and_project_id_with_default_auth_and_overridden_project
482486
mock_get_creds_and_proj_id.assert_called_once_with(
483487
key_path=None,
484488
keyfile_dict=None,
489+
credential_config_file=None,
485490
key_secret_name=None,
486491
key_secret_project_id=None,
487492
scopes=self.instance.scopes,
@@ -683,6 +688,7 @@ def test_get_credentials_and_project_id_with_impersonation_chain(
683688
mock_get_creds_and_proj_id.assert_called_once_with(
684689
key_path=None,
685690
keyfile_dict=None,
691+
credential_config_file=None,
686692
key_secret_name=None,
687693
key_secret_project_id=None,
688694
scopes=self.instance.scopes,

0 commit comments

Comments
 (0)