Skip to content

Commit

Permalink
Fixes AzureFileShare connection extras (#16388)
Browse files Browse the repository at this point in the history
* Fixes AzureFileShare connection extras

The Azure File Share connection has not been creted in #15159 and it
caused an unexpected side effect as the default Azure Connection
passed service_options dictionary to FileService
with key that was unexpected.

This change fixes two things:

1) adds AzureFileShare connection that has separate conn_type
   and handles the extra_options specific for FileService Hook
   available in the Airflow UI.

2) handles the "deprecated" way of passing keys without UI prefix
   but raises a deprecation warning when such key is passed or
   when the Wasb connection is used with an empty extras rather
   than Azure File Share.

Fixes #16254

* fixup! Fixes AzureFileShare connection extras

* fixup! fixup! Fixes AzureFileShare connection extras
  • Loading branch information
potiuk committed Jun 11, 2021
1 parent e975e86 commit 0c80a7d
Show file tree
Hide file tree
Showing 17 changed files with 492 additions and 341 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
share_name=AZURE_SHARE_NAME,
dest_gcs=DEST_GCS_BUCKET,
directory_name=AZURE_DIRECTORY_NAME,
wasb_conn_id='azure_fileshare_default',
azure_fileshare_conn_id='azure_fileshare_default',
gcp_conn_id='google_cloud_default',
replace=False,
gzip=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class AzureFileShareToGCSOperator(BaseOperator):
:param prefix: Prefix string which filters objects whose name begin with
such prefix. (templated)
:type prefix: str
:param wasb_conn_id: The source WASB connection
:type wasb_conn_id: str
:param azure_fileshare_conn_id: The source WASB connection
:type azure_fileshare_conn_id: str
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
:type gcp_conn_id: str
:param dest_gcs: The destination Google Cloud Storage bucket and prefix
Expand Down Expand Up @@ -82,7 +82,7 @@ def __init__(
dest_gcs: str,
directory_name: Optional[str] = None,
prefix: str = '',
wasb_conn_id: str = 'wasb_default',
azure_fileshare_conn_id: str = 'azure_fileshare_default',
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None,
replace: bool = False,
Expand All @@ -95,7 +95,7 @@ def __init__(
self.share_name = share_name
self.directory_name = directory_name
self.prefix = prefix
self.wasb_conn_id = wasb_conn_id
self.azure_fileshare_conn_id = azure_fileshare_conn_id
self.gcp_conn_id = gcp_conn_id
self.dest_gcs = dest_gcs
self.delegate_to = delegate_to
Expand All @@ -114,7 +114,7 @@ def __init__(
)

def execute(self, context):
azure_fileshare_hook = AzureFileShareHook(self.wasb_conn_id)
azure_fileshare_hook = AzureFileShareHook(self.azure_fileshare_conn_id)
files = azure_fileshare_hook.list_files(
share_name=self.share_name, directory_name=self.directory_name
)
Expand Down
5 changes: 5 additions & 0 deletions airflow/providers/microsoft/azure/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ Breaking changes

* ``Auto-apply apply_default decorator (#15667)``


``Azure Container Volume`` and ``Azure File Share`` have now dedicated connection types with editable
UI fields. You should not use ``Wasb`` connection type any more for those connections. Names of
connection ids for those hooks/operators were changed to reflect that.

Features
~~~~~~~~

Expand Down
59 changes: 52 additions & 7 deletions airflow/providers/microsoft/azure/hooks/azure_container_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Dict

from azure.mgmt.containerinstance.models import AzureFileVolume, Volume

Expand All @@ -25,22 +26,66 @@ class AzureContainerVolumeHook(BaseHook):
"""
A hook which wraps an Azure Volume.
:param wasb_conn_id: :ref:`Wasb connection id<howto/connection:wasb>` of an Azure storage
account of which file shares should be mounted.
:type wasb_conn_id: str
:param azure_container_volume_conn_id: Reference to the
:ref:`Azure Container Volume connection id <howto/connection:azure_container_volume>`
of an Azure account of which container volumes should be used.
:type azure_container_volume_conn_id: str
"""

def __init__(self, wasb_conn_id: str = 'wasb_default') -> None:
conn_name_attr = "azure_container_volume_conn_id"
default_conn_name = 'azure_container_volume_default'
conn_type = 'azure_container_volume'
hook_name = 'Azure Container Volume'

def __init__(self, azure_container_volume_conn_id: str = 'azure_container_volume_default') -> None:
super().__init__()
self.conn_id = wasb_conn_id
self.conn_id = azure_container_volume_conn_id

@staticmethod
def get_connection_form_widgets() -> Dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget
from flask_babel import lazy_gettext
from wtforms import PasswordField

return {
"extra__azure_container_volume__connection_string": PasswordField(
lazy_gettext('Blob Storage Connection String (optional)'), widget=BS3PasswordFieldWidget()
),
}

@staticmethod
def get_ui_field_behaviour() -> Dict:
"""Returns custom field behaviour"""
import json

return {
"hidden_fields": ['schema', 'port', 'host', "extra"],
"relabeling": {
'login': 'Azure Client ID',
'password': 'Azure Secret',
},
"placeholders": {
'extra': json.dumps(
{
"key_path": "path to json file for auth",
"key_json": "specifies json dict for auth",
},
indent=1,
),
'login': 'client_id (token credentials auth)',
'password': 'secret (token credentials auth)',
'extra__azure_container_volume__connection_string': 'connection string auth',
},
}

def get_storagekey(self) -> str:
"""Get Azure File Volume storage key"""
conn = self.get_connection(self.conn_id)
service_options = conn.extra_dejson

if 'connection_string' in service_options:
for keyvalue in service_options['connection_string'].split(";"):
if 'extra__azure_container_volume__connection_string' in service_options:
for keyvalue in service_options['extra__azure_container_volume__connection_string'].split(";"):
key, value = keyvalue.split("=", 1)
if key == "AccountKey":
return value
Expand Down
90 changes: 79 additions & 11 deletions airflow/providers/microsoft/azure/hooks/azure_fileshare.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
# specific language governing permissions and limitations
# under the License.
#
from typing import List, Optional
import warnings
from typing import Any, Dict, List, Optional

from azure.storage.file import File, FileService

Expand All @@ -27,24 +28,91 @@ class AzureFileShareHook(BaseHook):
"""
Interacts with Azure FileShare Storage.
Additional options passed in the 'extra' field of the connection will be
passed to the `FileService()` constructor.
:param azure_fileshare_conn_id: Reference to the
:ref:`Azure Container Volume connection id<howto/connection:azure_fileshare>`
of an Azure account of which container volumes should be used.
:param wasb_conn_id: Reference to the :ref:`wasb connection <howto/connection:wasb>`.
:type wasb_conn_id: str
"""

def __init__(self, wasb_conn_id: str = 'wasb_default') -> None:
conn_name_attr = "azure_fileshare_conn_id"
default_conn_name = 'azure_fileshare_default'
conn_type = 'azure_fileshare'
hook_name = 'Azure FileShare'

def __init__(self, azure_fileshare_conn_id: str = 'azure_fileshare_default') -> None:
super().__init__()
self.conn_id = wasb_conn_id
self.conn_id = azure_fileshare_conn_id
self._conn = None

@staticmethod
def get_connection_form_widgets() -> Dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import PasswordField, StringField

return {
"extra__azure_fileshare__sas_token": PasswordField(
lazy_gettext('SAS Token (optional)'), widget=BS3PasswordFieldWidget()
),
"extra__azure_fileshare__connection_string": StringField(
lazy_gettext('Connection String (optional)'), widget=BS3TextFieldWidget()
),
"extra__azure_fileshare__protocol": StringField(
lazy_gettext('Account URL or token (optional)'), widget=BS3TextFieldWidget()
),
}

@staticmethod
def get_ui_field_behaviour() -> Dict:
"""Returns custom field behaviour"""
return {
"hidden_fields": ['schema', 'port', 'host', 'extra'],
"relabeling": {
'login': 'Blob Storage Login (optional)',
'password': 'Blob Storage Key (optional)',
'host': 'Account Name (Active Directory Auth)',
},
"placeholders": {
'login': 'account name',
'password': 'secret',
'host': 'account url',
'extra__azure_fileshare__sas_token': 'account url or token (optional)',
'extra__azure_fileshare__connection_string': 'account url or token (optional)',
'extra__azure_fileshare__protocol': 'account url or token (optional)',
},
}

def get_conn(self) -> FileService:
"""Return the FileService object."""
if not self._conn:
conn = self.get_connection(self.conn_id)
service_options = conn.extra_dejson
self._conn = FileService(account_name=conn.login, account_key=conn.password, **service_options)
prefix = "extra__azure_fileshare__"
if self._conn:
return self._conn
conn = self.get_connection(self.conn_id)
service_options_with_prefix = conn.extra_dejson
service_options = {}
for key, value in service_options_with_prefix.items():
# in case dedicated FileShareHook is used, the connection will use the extras from UI.
# in case deprecated wasb hook is used, the old extras will work as well
if key.startswith(prefix):
if value != '':
service_options[key[len(prefix) :]] = value
else:
# warn if the deprecated wasb_connection is used
warnings.warn(
"You are using deprecated connection for AzureFileShareHook."
" Please change it to `Azure FileShare`.",
DeprecationWarning,
)
else:
service_options[key] = value
# warn if the old non-prefixed value is used
warnings.warn(
"You are using deprecated connection for AzureFileShareHook."
" Please change it to `Azure FileShare`.",
DeprecationWarning,
)
self._conn = FileService(account_name=conn.login, account_key=conn.password, **service_options)
return self._conn

def check_for_directory(self, share_name: str, directory_name: str, **kwargs) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class AzureContainerInstancesOperator(BaseOperator):
"POSTGRES_PASSWORD": "{{ macros.connection('postgres_default').password }}",
"JOB_GUID": "{{ ti.xcom_pull(task_ids='task1', key='guid') }}" },
secured_variables = ['POSTGRES_PASSWORD'],
volumes = [("azure_wasb_conn_id",
volumes = [("azure_container_instance_conn_id",
"my_storage_container",
"my_fileshare",
"/input-data",
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/microsoft/azure/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ hook-class-names:
- airflow.providers.microsoft.azure.hooks.azure_batch.AzureBatchHook
- airflow.providers.microsoft.azure.hooks.azure_cosmos.AzureCosmosDBHook
- airflow.providers.microsoft.azure.hooks.azure_data_lake.AzureDataLakeHook
- airflow.providers.microsoft.azure.hooks.azure_fileshare.AzureFileShareHook
- airflow.providers.microsoft.azure.hooks.azure_container_volume.AzureContainerVolumeHook
- airflow.providers.microsoft.azure.hooks.azure_container_instance.AzureContainerInstanceHook
- airflow.providers.microsoft.azure.hooks.wasb.WasbHook
- airflow.providers.microsoft.azure.hooks.azure_data_factory.AzureDataFactoryHook
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _howto/connection:azure_container_volume:

Microsoft Azure Container Volume Connection
===========================================

The Microsoft Azure Container Volume connection type enables the Azure Container Volume Integrations.

Authenticating to Azure Container Volume
----------------------------------------

There are four ways to connect to Azure Container Volume using Airflow.

1. Use `token credentials
<https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate?tabs=cmd#authenticate-with-token-credentials>`_
i.e. add specific credentials (client_id, secret) and subscription id to the Airflow connection.
2. Use a `Connection String
<https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/storage>`_
i.e. add connection string to ``extra__azure_container_volume__connection_string`` in the Airflow connection.

Only one authorization method can be used at a time. If you need to manage multiple credentials or keys then you should
configure multiple connections.

Default Connection IDs
----------------------

All hooks and operators related to Azure Container Volume use ``azure_container_volume_default`` by default.

Configuring the Connection
--------------------------

Login (optional)
Specify the login used for azure blob storage. For use with Shared Key Credential and SAS Token authentication.

Password (optional)
Specify the password used for azure blob storage. For use with
Active Directory (token credential) and shared key authentication.

Host (optional)
Specify the account url for anonymous public read, Active Directory, shared access key authentication.

Extra (optional)
Specify the extra parameters (as json dictionary) that can be used in Azure connection.
The following parameters are all optional:

* ``extra__azure_container_volume__connection_string``: Connection string for use with connection string authentication.

When specifying the connection in environment variable you should specify
it using URI syntax.

Note that all components of the URI should be URL-encoded.

For example connect with token credentials:

.. code-block:: bash
export AIRFLOW_CONN_WASP_DEFAULT='azure_container_volume://blob%20username:blob%[email protected]'

0 comments on commit 0c80a7d

Please sign in to comment.