Skip to content

Commit

Permalink
Add support in GCP connection for reading key from Secret Manager (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
keze committed Nov 14, 2021
1 parent 244627e commit dc0159e
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 13 deletions.
37 changes: 34 additions & 3 deletions airflow/providers/google/cloud/utils/credentials_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from google.auth.environment_vars import CREDENTIALS, LEGACY_PROJECT, PROJECT

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud._internal_client.secret_manager_client import _SecretManagerClient
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.process_utils import patch_environ

Expand Down Expand Up @@ -202,20 +203,23 @@ def __init__(
self,
key_path: Optional[str] = None,
keyfile_dict: Optional[Dict[str, str]] = None,
key_secret_name: Optional[str] = None,
scopes: Optional[Collection[str]] = None,
delegate_to: Optional[str] = None,
disable_logging: bool = False,
target_principal: Optional[str] = None,
delegates: Optional[Sequence[str]] = None,
) -> None:
super().__init__()
if key_path and keyfile_dict:
key_options = [key_path, key_secret_name, keyfile_dict]
if len([x for x in key_options if x]) > 1:
raise AirflowException(
"The `keyfile_dict` and `key_path` fields are mutually exclusive. "
"Please provide only one value."
"The `keyfile_dict`, `key_path`, and `key_secret_name` fields"
"are all mutually exclusive. Please provide only one value."
)
self.key_path = key_path
self.keyfile_dict = keyfile_dict
self.key_secret_name = key_secret_name
self.scopes = scopes
self.delegate_to = delegate_to
self.disable_logging = disable_logging
Expand All @@ -231,6 +235,8 @@ def get_credentials_and_project(self) -> Tuple[google.auth.credentials.Credentia
"""
if self.key_path:
credentials, project_id = self._get_credentials_using_key_path()
elif self.key_secret_name:
credentials, project_id = self._get_credentials_using_key_secret_name()
elif self.keyfile_dict:
credentials, project_id = self._get_credentials_using_keyfile_dict()
else:
Expand Down Expand Up @@ -283,6 +289,31 @@ def _get_credentials_using_key_path(self):
project_id = credentials.project_id
return credentials, project_id

def _get_credentials_using_key_secret_name(self):
self._log_debug('Getting connection using JSON key data from GCP secret: %s', self.key_secret_name)

# Use ADC to access GCP Secret Manager.
adc_credentials, adc_project_id = google.auth.default(scopes=self.scopes)
secret_manager_client = _SecretManagerClient(credentials=adc_credentials)

if not secret_manager_client.is_valid_secret_name(self.key_secret_name):
raise AirflowException('Invalid secret name specified for fetching JSON key data.')

secret_value = secret_manager_client.get_secret(self.key_secret_name, adc_project_id)
if secret_value is None:
raise AirflowException(f"Failed getting value of secret {self.key_secret_name}.")

try:
keyfile_dict = json.loads(secret_value)
except json.decoder.JSONDecodeError:
raise AirflowException('Key data read from GCP Secret Manager is not valid JSON.')

credentials = google.oauth2.service_account.Credentials.from_service_account_info(
keyfile_dict, scopes=self.scopes
)
project_id = credentials.project_id
return credentials, project_id

def _get_credentials_using_adc(self):
self._log_info(
'Getting connection using `google.auth.default()` since no key file is defined for hook.'
Expand Down
5 changes: 5 additions & 0 deletions airflow/providers/google/common/hooks/base_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ def get_connection_form_widgets() -> Dict[str, Any]:
"extra__google_cloud_platform__scope": StringField(
lazy_gettext('Scopes (comma separated)'), widget=BS3TextFieldWidget()
),
"extra__google_cloud_platform__key_secret_name": StringField(
lazy_gettext('Keyfile Secret Name (in GCP Secret Manager)'), widget=BS3TextFieldWidget()
),
"extra__google_cloud_platform__num_retries": IntegerField(
lazy_gettext('Number of Retries'),
validators=[NumberRange(min=0)],
Expand Down Expand Up @@ -225,12 +228,14 @@ def _get_credentials_and_project_id(self) -> Tuple[google.auth.credentials.Crede
keyfile_dict_json = json.loads(keyfile_dict)
except json.decoder.JSONDecodeError:
raise AirflowException('Invalid key JSON.')
key_secret_name: Optional[str] = self._get_field('key_secret_name', None)

target_principal, delegates = _get_target_principal_and_delegates(self.impersonation_chain)

credentials, project_id = get_credentials_and_project_id(
key_path=key_path,
keyfile_dict=keyfile_dict_json,
key_secret_name=key_secret_name,
scopes=self.scopes,
delegate_to=self.delegate_to,
target_principal=target_principal,
Expand Down
32 changes: 24 additions & 8 deletions docs/apache-airflow-providers-google/connections/gcp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,26 @@ The Google Cloud connection type enables the Google Cloud Integrations.
Authenticating to Google Cloud
------------------------------

There are three ways to connect to Google Cloud using Airflow.
There are two ways to connect to Google Cloud using Airflow.

1. Use `Application Default Credentials
1. Using a `Application Default Credentials
<https://google-auth.readthedocs.io/en/latest/reference/google.auth.html#google.auth.default>`_,
2. Use a `service account
<https://cloud.google.com/docs/authentication/#service_accounts>`_ key
file (JSON format) on disk - ``Keyfile Path``.
3. Use a service account key file (JSON format) from connection configuration - ``Keyfile JSON``.
2. Using a `service account
<https://cloud.google.com/docs/authentication/#service_accounts>`_ by specifying a key file in JSON format.
Key can be specified as a path to the key file (``Keyfile Path``), as a key payload (``Keyfile JSON``)
or as secret in Secret Manager (``Keyfile secret name``). Only one way of defining the key can be used at a time.
If you need to manage multiple keys then you should configure multiple connections.

Only one authorization method can be used at a time. If you need to manage multiple keys then you should
configure multiple connections.
.. warning:: Additional permissions might be needed

Connection which uses key from the Secret Manager requires that `Application Default Credentials
<https://google-auth.readthedocs.io/en/latest/reference/google.auth.html#google.auth.default>`_ (ADC)
have permission to access payloads of secrets.

.. note:: Alternative way of storing connections

Besides storing only key in Secret Manager there is an option for storing entire connection.
For more details take a look at :ref:`Google Secret Manager Backend <google_cloud_secret_manager_backend>`.

Default Connection IDs
----------------------
Expand Down Expand Up @@ -89,6 +98,12 @@ Keyfile JSON

Not required if using application default credentials.

Secret name which holds Keyfile JSON
Name of the secret in Secret Manager which contains a `service account
<https://cloud.google.com/docs/authentication/#service_accounts>`_ key.

Not required if using application default credentials.

Scopes (comma separated)
A list of comma-separated `Google Cloud scopes
<https://developers.google.com/identity/protocols/googlescopes>`_ to
Expand All @@ -112,6 +127,7 @@ Number of Retries
* ``extra__google_cloud_platform__project`` - Project Id
* ``extra__google_cloud_platform__key_path`` - Keyfile Path
* ``extra__google_cloud_platform__keyfile_dict`` - Keyfile JSON
* ``extra__google_cloud_platform__key_secret_name`` - Secret name which holds Keyfile JSON
* ``extra__google_cloud_platform__scope`` - Scopes
* ``extra__google_cloud_platform__num_retries`` - Number of Retries

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ command as in the example below.
--replication-policy=automatic
Created version [1] of the secret [airflow-variables-first-variable].
.. note:: If only key of the connection should be hidden there is an option to store
only that key in Cloud Secret Manager and not entire connection. For more details take
a look at :ref:`Google Cloud Connection <howto/connection:gcp>`.

Checking configuration
======================

Expand Down
43 changes: 42 additions & 1 deletion tests/providers/google/cloud/utils/test_credentials_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import unittest
from io import StringIO
from unittest import mock
from unittest.mock import ANY
from uuid import uuid4

import pytest
Expand Down Expand Up @@ -267,12 +268,52 @@ def test_get_credentials_and_project_id_with_service_account_info(self, mock_fro
'connection using JSON Dict'
] == cm.output

@mock.patch("google.auth.default", return_value=("CREDENTIALS", "PROJECT_ID"))
@mock.patch('google.oauth2.service_account.Credentials.from_service_account_info')
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider._SecretManagerClient")
def test_get_credentials_and_project_id_with_key_secret_name(
self, mock_secret_manager_client, mock_from_service_account_info, mock_default
):
mock_secret_manager_client.return_value.is_valid_secret_name.return_value = True
mock_secret_manager_client.return_value.get_secret.return_value = (
"{\"type\":\"service_account\",\"project_id\":\"pid\","
"\"private_key_id\":\"pkid\","
"\"private_key\":\"payload\"}"
)

get_credentials_and_project_id(key_secret_name="secret name")
mock_from_service_account_info.assert_called_once_with(
{
'type': 'service_account',
'project_id': 'pid',
'private_key_id': 'pkid',
'private_key': 'payload',
},
scopes=ANY,
)

@mock.patch("google.auth.default", return_value=("CREDENTIALS", "PROJECT_ID"))
@mock.patch("airflow.providers.google.cloud.utils.credentials_provider._SecretManagerClient")
def test_get_credentials_and_project_id_with_key_secret_name_when_key_is_invalid(
self, mock_secret_manager_client, mock_default
):
mock_secret_manager_client.return_value.is_valid_secret_name.return_value = True
mock_secret_manager_client.return_value.get_secret.return_value = ""

with pytest.raises(
AirflowException,
match=re.escape('Key data read from GCP Secret Manager is not valid JSON.'),
):
get_credentials_and_project_id(key_secret_name="secret name")

def test_get_credentials_and_project_id_with_mutually_exclusive_configuration(
self,
):
with pytest.raises(
AirflowException,
match=re.escape('The `keyfile_dict` and `key_path` fields are mutually exclusive.'),
match=re.escape(
'The `keyfile_dict`, `key_path`, and `key_secret_name` fieldsare all mutually exclusive.'
),
):
get_credentials_and_project_id(key_path='KEY.json', keyfile_dict={'private_key': 'PRIVATE_KEY'})

Expand Down
10 changes: 9 additions & 1 deletion tests/providers/google/common/hooks/test_base_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ def test_get_credentials_and_project_id_with_default_auth(self, mock_get_creds_a
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
keyfile_dict=None,
key_secret_name=None,
scopes=self.instance.scopes,
delegate_to=None,
target_principal=None,
Expand All @@ -348,6 +349,7 @@ def test_get_credentials_and_project_id_with_service_account_file(self, mock_get
mock_get_creds_and_proj_id.assert_called_once_with(
key_path='KEY_PATH.json',
keyfile_dict=None,
key_secret_name=None,
scopes=self.instance.scopes,
delegate_to=None,
target_principal=None,
Expand Down Expand Up @@ -375,6 +377,7 @@ def test_get_credentials_and_project_id_with_service_account_info(self, mock_get
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
keyfile_dict=service_account,
key_secret_name=None,
scopes=self.instance.scopes,
delegate_to=None,
target_principal=None,
Expand All @@ -392,6 +395,7 @@ def test_get_credentials_and_project_id_with_default_auth_and_delegate(self, moc
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
keyfile_dict=None,
key_secret_name=None,
scopes=self.instance.scopes,
delegate_to="USER",
target_principal=None,
Expand Down Expand Up @@ -425,6 +429,7 @@ def test_get_credentials_and_project_id_with_default_auth_and_overridden_project
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
keyfile_dict=None,
key_secret_name=None,
scopes=self.instance.scopes,
delegate_to=None,
target_principal=None,
Expand All @@ -442,7 +447,9 @@ def test_get_credentials_and_project_id_with_mutually_exclusive_configuration(
}
with pytest.raises(
AirflowException,
match=re.escape('The `keyfile_dict` and `key_path` fields are mutually exclusive.'),
match=re.escape(
"The `keyfile_dict`, `key_path`, and `key_secret_name` fields" "are all mutually exclusive. "
),
):
self.instance._get_credentials_and_project_id()

Expand Down Expand Up @@ -626,6 +633,7 @@ def test_get_credentials_and_project_id_with_impersonation_chain(
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
keyfile_dict=None,
key_secret_name=None,
scopes=self.instance.scopes,
delegate_to=None,
target_principal=target_principal,
Expand Down

0 comments on commit dc0159e

Please sign in to comment.