Skip to content

Commit

Permalink
Get Airflow Variables from GCP Secrets Manager (#7946)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil committed Mar 28, 2020
1 parent c1c88ab commit 0e1c238
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 5 deletions.
37 changes: 34 additions & 3 deletions airflow/providers/google/cloud/secrets/secrets_manager.py
Expand Up @@ -51,10 +51,18 @@ class CloudSecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
For example, if the Secrets Manager secret id is ``airflow-connections-smtp_default``, this would be
accessiblen if you provide ``{"connections_prefix": "airflow-connections", "sep": "-"}`` and request
conn_id ``smtp_default``. The full secret id should follow the pattern "[a-zA-Z0-9-_]".
conn_id ``smtp_default``.
If the Secrets Manager secret id is ``airflow-variables-hello``, this would be
accessible if you provide ``{"variables_prefix": "airflow-variables", "sep": "-"}`` and request
Variable Key ``hello``.
The full secret id should follow the pattern "[a-zA-Z0-9-_]".
:param connections_prefix: Specifies the prefix of the secret to read to get Connections.
:type connections_prefix: str
:param variables_prefix: Specifies the prefix of the secret to read to get Variables.
:type variables_prefix: str
:param gcp_key_path: Path to GCP Credential JSON file;
use default credentials in the current environment if not provided.
:type gcp_key_path: str
Expand All @@ -66,21 +74,24 @@ class CloudSecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
def __init__(
self,
connections_prefix: str = "airflow-connections",
variables_prefix: str = "airflow-variables",
gcp_key_path: Optional[str] = None,
gcp_scopes: Optional[str] = None,
sep: str = "-",
**kwargs
):
super().__init__(**kwargs)
self.connections_prefix = connections_prefix
self.variables_prefix = variables_prefix
self.gcp_key_path = gcp_key_path
self.gcp_scopes = gcp_scopes
self.sep = sep
self.credentials: Optional[str] = None
self.project_id: Optional[str] = None
if not self._is_valid_prefix_and_sep():
raise AirflowException(
f"`connections_prefix` and `sep` should follows that pattern {SECRET_ID_PATTERN}"
"`connections_prefix`, `variables_prefix` and `sep` should "
f"follows that pattern {SECRET_ID_PATTERN}"
)

def _is_valid_prefix_and_sep(self) -> bool:
Expand Down Expand Up @@ -110,7 +121,27 @@ def get_conn_uri(self, conn_id: str) -> Optional[str]:
:param conn_id: connection id
:type conn_id: str
"""
secret_id = self.build_path(self.connections_prefix, conn_id, self.sep)
return self._get_secret(self.connections_prefix, conn_id)

def get_variable(self, key: str) -> Optional[str]:
"""
Get Airflow Variable from Environment Variable
:param key: Variable Key
:return: Variable Value
"""
return self._get_secret(self.variables_prefix, key)

def _get_secret(self, path_prefix: str, secret_id: str) -> Optional[str]:
"""
Get secret value from Parameter Store.
:param path_prefix: Prefix for the Path to get Secret
:type path_prefix: str
:param secret_id: Secret Key
:type secret_id: str
"""
secret_id = self.build_path(path_prefix, secret_id, self.sep)
# always return the latest version of the secret
secret_version = "latest"
name = self.client.secret_version_path(self.project_id, secret_id, secret_version)
Expand Down
22 changes: 20 additions & 2 deletions docs/howto/use-alternative-secrets-backend.rst
Expand Up @@ -183,26 +183,44 @@ Note that the secret ``Key`` is ``value``, and secret ``Value`` is ``world`` and
GCP Secrets Manager Backend
^^^^^^^^^^^^^^^^^^^^^^^^^^^

To enable GCP Secrets Manager to retrieve connection, specify :py:class:`~airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerBackend`
To enable GCP Secrets Manager to retrieve connection/variables, specify :py:class:`~airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerBackend`
as the ``backend`` in ``[secrets]`` section of ``airflow.cfg``.

Available parameters to ``backend_kwargs``:

* ``connections_prefix``: Specifies the prefix of the secret to read to get Connections.
* ``variables_prefix``: Specifies the prefix of the secret to read to get Variables.
* ``gcp_key_path``: Path to GCP Credential JSON file
* ``gcp_scopes``: Comma-separated string containing GCP scopes
* ``sep``: separator used to concatenate connections_prefix and conn_id. Default: "-"

Note: The full GCP Secrets Manager secret id should follow the pattern "[a-zA-Z0-9-_]".

Here is a sample configuration:
Here is a sample configuration if you want to just retrieve connections:

.. code-block:: ini
[secrets]
backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow-connections", "sep": "-"}
Here is a sample configuration if you want to just retrieve variables:

.. code-block:: ini
[secrets]
backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerBackend
backend_kwargs = {"variables_prefix": "airflow-variables", "sep": "-"}
and if you want to retrieve both Variables and connections use the following sample config:

.. code-block:: ini
[secrets]
backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow-connections", "variables_prefix": "airflow-variables", "sep": "-"}
When ``gcp_key_path`` is not provided, it will use the Application Default Credentials in the current environment. You can set up the credentials with:

.. code-block:: ini
Expand Down
45 changes: 45 additions & 0 deletions tests/providers/google/cloud/secrets/test_secrets_manager.py
Expand Up @@ -29,9 +29,12 @@
KEY_FILE = 'test-file.json'
PROJECT_ID = 'test-project-id'
CONNECTIONS_PREFIX = "test-connections"
VARIABLES_PREFIX = "test-variables"
SEP = '-'
CONN_ID = 'test-postgres'
CONN_URI = 'postgresql://airflow:airflow@host:5432/airflow'
VAR_KEY = 'hello'
VAR_VALUE = 'world'

MODULE_NAME = "airflow.providers.google.cloud.secrets.secrets_manager"

Expand Down Expand Up @@ -116,3 +119,45 @@ def test_get_conn_uri_non_existent_key(self, mock_client_callable, mock_get_cred
log_output.output[0],
f"GCP API Call Error \\(NotFound\\): Secret ID {secret_id} not found"
)

@parameterized.expand([
"airflow-variables",
"variables",
"airflow"
])
@mock.patch(MODULE_NAME + ".get_credentials_and_project_id")
@mock.patch(MODULE_NAME + ".SecretManagerServiceClient")
def test_get_variable(self, variables_prefix, mock_client_callable, mock_get_creds):
mock_get_creds.return_value = CREDENTIALS, PROJECT_ID
mock_client = mock.MagicMock()
mock_client_callable.return_value = mock_client

test_response = AccessSecretVersionResponse()
test_response.payload.data = VAR_VALUE.encode("UTF-8")
mock_client.access_secret_version.return_value = test_response

secrets_manager_backend = CloudSecretsManagerBackend(variables_prefix=variables_prefix)
secret_id = secrets_manager_backend.build_path(variables_prefix, VAR_KEY, SEP)
returned_uri = secrets_manager_backend.get_variable(VAR_KEY)
self.assertEqual(VAR_VALUE, returned_uri)
mock_client.secret_version_path.assert_called_once_with(
PROJECT_ID, secret_id, "latest"
)

@mock.patch(MODULE_NAME + ".get_credentials_and_project_id")
@mock.patch(MODULE_NAME + ".SecretManagerServiceClient")
def test_get_variable_non_existent_key(self, mock_client_callable, mock_get_creds):
mock_get_creds.return_value = CREDENTIALS, PROJECT_ID
mock_client = mock.MagicMock()
mock_client_callable.return_value = mock_client
# The requested secret id or secret version does not exist
mock_client.access_secret_version.side_effect = NotFound('test-msg')

secrets_manager_backend = CloudSecretsManagerBackend(variables_prefix=VARIABLES_PREFIX)
secret_id = secrets_manager_backend.build_path(VARIABLES_PREFIX, VAR_KEY, SEP)
with self.assertLogs(secrets_manager_backend.log, level="ERROR") as log_output:
self.assertIsNone(secrets_manager_backend.get_variable(VAR_KEY))
self.assertRegex(
log_output.output[0],
f"GCP API Call Error \\(NotFound\\): Secret ID {secret_id} not found"
)

0 comments on commit 0e1c238

Please sign in to comment.