Skip to content

Commit

Permalink
Adds mechanism for provider package discovery. (#12383)
Browse files Browse the repository at this point in the history
This is a simple mechanism that will allow us to dynamically
discover and register all provider packages in the Airflow core.

Closes: #11422
  • Loading branch information
potiuk committed Nov 17, 2020
1 parent 525f659 commit 2c0920f
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 25 deletions.
29 changes: 15 additions & 14 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2006,20 +2006,21 @@ This is the current syntax for `./breeze <./breeze>`_:
Run selected static checks for currently changed files. You should specify static check that
you would like to run or 'all' to run all checks. One of:
all all-but-pylint airflow-config-yaml airflow-providers-available base-operator
bats-tests bats-in-container-tests black build build-providers-dependencies
check-apache-license check-builtin-literals check-executables-have-shebangs
check-hooks-apply check-integrations check-merge-conflict check-xml
consistent-pylint daysago-import-check debug-statements detect-private-key doctoc
dont-use-safe-filter end-of-file-fixer fix-encoding-pragma flake8 forbid-tabs
helm-lint incorrect-use-of-LoggingMixin insert-license isort language-matters
lint-dockerfile lint-openapi markdownlint mermaid mixed-line-ending mypy mypy-helm
no-providers-in-core-examples no-relative-imports pre-commit-descriptions
provide-create-sessions providers-init-file provider-yamls pydevd pydocstyle pylint
pylint-tests python-no-log-warn pyupgrade restrict-start_date rst-backticks
setup-order setup-extra-packages shellcheck sort-in-the-wild stylelint
trailing-whitespace update-breeze-file update-extras update-local-yml-file
update-setup-cfg-file version-sync yamllint
all all-but-pylint airflow-config-yaml airflow-providers-available
airflow-provider-yaml-files-ok base-operator bats-tests bats-in-container-tests
black build build-providers-dependencies check-apache-license check-builtin-literals
check-executables-have-shebangs check-hooks-apply check-integrations
check-merge-conflict check-xml consistent-pylint daysago-import-check
debug-statements detect-private-key doctoc dont-use-safe-filter end-of-file-fixer
fix-encoding-pragma flake8 forbid-tabs helm-lint incorrect-use-of-LoggingMixin
insert-license isort language-matters lint-dockerfile lint-openapi markdownlint
mermaid mixed-line-ending mypy mypy-helm no-providers-in-core-examples
no-relative-imports pre-commit-descriptions provide-create-sessions
providers-init-file provider-yamls pydevd pydocstyle pylint pylint-tests
python-no-log-warn pyupgrade restrict-start_date rst-backticks setup-order
setup-extra-packages shellcheck sort-in-the-wild stylelint trailing-whitespace
update-breeze-file update-extras update-local-yml-file update-setup-cfg-file
version-sync yamllint
You can pass extra arguments including options to to the pre-commit framework as
<EXTRA_ARGS> passed after --. For example:
Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ global-exclude __pycache__ *.pyc
include airflow/alembic.ini
include airflow/api_connexion/openapi/v1.yaml
include airflow/git_version
include airflow/provider.yaml.schema.json
include airflow/serialization/schema.json
include airflow/utils/python_virtualenv_script.jinja2
2 changes: 2 additions & 0 deletions STATIC_CODE_CHECKS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ require Breeze Docker images to be installed locally:
----------------------------------- ---------------------------------------------------------------- ------------
``airflow-providers-available`` Checks that providers are properly declared by extras
----------------------------------- ---------------------------------------------------------------- ------------
``airflow-provider-yaml-files-ok`` Checks that providers yaml files are valid
----------------------------------- ---------------------------------------------------------------- ------------
``base-operator`` Checks that BaseOperator is imported properly
----------------------------------- ---------------------------------------------------------------- ------------
``bats-tests`` Runs BATS bash unit tests
Expand Down
File renamed without changes.
3 changes: 3 additions & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ sensors:
- integration-name: Google Cloud Storage Transfer Service
python-modules:
- airflow.providers.google.cloud.sensors.cloud_storage_transfer_service
- integration-name: Google Dataflow
python-modules:
- airflow.providers.google.cloud.sensors.dataflow
- integration-name: Google Dataproc
python-modules:
- airflow.providers.google.cloud.sensors.dataproc
Expand Down
85 changes: 85 additions & 0 deletions airflow/providers_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Manages all providers."""
import importlib
import json
import logging
import pkgutil
import traceback
from typing import Dict

import jsonschema
import yaml

try:
import importlib.resources as importlib_resources
except ImportError:
# Try backported to PY<37 `importlib_resources`.
import importlib_resources


log = logging.getLogger(__name__)


def _load_schema() -> Dict:
return json.loads(importlib_resources.read_text('airflow', 'provider.yaml.schema.json'))


class ProvidersManager:
"""Manages all provider packages."""

def __init__(self):
self._provider_directory = {}
try:
from airflow import providers
except ImportError as e:
log.warning("No providers are present or error when importing them! :%s", e)
return
self._schema = _load_schema()
self.__find_all_providers(providers.__path__)

def __find_all_providers(self, paths: str):
def onerror(_):
exception_string = traceback.format_exc()
log.warning(exception_string)

for module_info in pkgutil.walk_packages(paths, prefix="airflow.providers.", onerror=onerror):
try:
imported_module = importlib.import_module(module_info.name)
except Exception as e: # noqa pylint: disable=broad-except
log.warning("Error when importing %s:%s", module_info.name, e)
continue
try:
provider = importlib_resources.read_text(imported_module, 'provider.yaml')
provider_info = yaml.safe_load(provider)
jsonschema.validate(provider_info, schema=self._schema)
self._provider_directory[provider_info['package-name']] = provider_info
except FileNotFoundError:
# This is OK - this is not a provider package
pass
except TypeError as e:
if "is not a package" not in str(e):
log.warning("Error when loading 'provider.yaml' file from %s:%s}", module_info.name, e)
# Otherwise this is OK - this is likely a module
except Exception as e: # noqa pylint: disable=broad-except
log.warning("Error when loading 'provider.yaml' file from %s:%s", module_info.name, e)

@property
def providers(self):
"""Returns information about available providers."""
return self._provider_directory
1 change: 1 addition & 0 deletions breeze-complete
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ all
all-but-pylint
airflow-config-yaml
airflow-providers-available
airflow-provider-yaml-files-ok
base-operator
bats-tests
bats-in-container-tests
Expand Down
2 changes: 2 additions & 0 deletions dev/provider_packages/MANIFEST_TEMPLATE.in.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ include airflow/providers/google/cloud/example_dags/*.sql
include airflow/providers/papermill/example_dags/*.ipynb
{% endif %}

include airflow/providers/{{ PROVIDER_PATH }}/provider.yaml

include NOTICE
include LICENSE
include CHANGELOG.txt
Expand Down
2 changes: 1 addition & 1 deletion docs/exts/provider_yaml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import yaml

ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "dev", "provider.yaml.schema.json")
PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "airflow", "provider.yaml.schema.json")


def _load_schema() -> Dict[str, Any]:
Expand Down
32 changes: 22 additions & 10 deletions scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from collections import Counter
from glob import glob
from itertools import chain, product
from typing import Any, Dict
from typing import Any, Dict, Iterable

import jsonschema
import yaml
Expand All @@ -36,7 +36,7 @@

ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir))
DOCS_DIR = os.path.join(ROOT_DIR, 'docs')
PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "dev", "provider.yaml.schema.json")
PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "airflow", "provider.yaml.schema.json")
CORE_INTEGRATIONS = ["SQL", "Local"]


Expand All @@ -53,10 +53,10 @@ def _load_schema() -> Dict[str, Any]:
return content


def _load_package_data():
def _load_package_data(package_paths: Iterable[str]):
schema = _load_schema()
result = {}
for provider_yaml_path in sorted(glob(f"{ROOT_DIR}/airflow/providers/**/provider.yaml", recursive=True)):
for provider_yaml_path in package_paths:
with open(provider_yaml_path) as yaml_file:
provider = yaml.safe_load(yaml_file)
rel_path = os.path.relpath(provider_yaml_path, ROOT_DIR)
Expand Down Expand Up @@ -259,10 +259,22 @@ def check_doc_files(yaml_files: Dict[str, Dict]):
sys.exit(1)


all_yaml_files: Dict[str, Dict] = _load_package_data()
if __name__ == '__main__':
all_provider_files = sorted(glob(f"{ROOT_DIR}/airflow/providers/**/provider.yaml", recursive=True))
if len(sys.argv) > 1:
paths = sorted(sys.argv[1:])
else:
paths = all_provider_files

check_integration_duplicates(all_yaml_files)
check_completeness_of_list_of_hooks_sensors_hooks(all_yaml_files)
check_completeness_of_list_of_transfers(all_yaml_files)
check_invalid_integration(all_yaml_files)
check_doc_files(all_yaml_files)
all_parsed_yaml_files: Dict[str, Dict] = _load_package_data(paths)

all_files_loaded = len(all_provider_files) == len(paths)
check_integration_duplicates(all_parsed_yaml_files)

check_completeness_of_list_of_hooks_sensors_hooks(all_parsed_yaml_files)
check_completeness_of_list_of_transfers(all_parsed_yaml_files)

if all_files_loaded:
# Only check those if all provider files are loaded
check_doc_files(all_parsed_yaml_files)
check_invalid_integration(all_parsed_yaml_files)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ def is_package_excluded(package: str, exclusion_list: List[str]):
'graphviz>=0.12',
'gunicorn>=19.5.0, <20.0',
'iso8601>=0.1.12',
'importlib_resources; python_version<"3.7"',
'jinja2>=2.10.1, <2.12.0',
'json-merge-patch==0.2',
'jsonschema~=3.0',
Expand Down
91 changes: 91 additions & 0 deletions tests/core/test_providers_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest

from airflow.providers_manager import ProvidersManager

ALL_PROVIDERS = [
'apache-airflow-providers-amazon',
'apache-airflow-providers-apache-cassandra',
'apache-airflow-providers-apache-druid',
'apache-airflow-providers-apache-hdfs',
'apache-airflow-providers-apache-hive',
'apache-airflow-providers-apache-kylin',
'apache-airflow-providers-apache-livy',
'apache-airflow-providers-apache-pig',
'apache-airflow-providers-apache-pinot',
'apache-airflow-providers-apache-spark',
'apache-airflow-providers-apache-sqoop',
'apache-airflow-providers-celery',
'apache-airflow-providers-cloudant',
'apache-airflow-providers-cncf-kubernetes',
'apache-airflow-providers-databricks',
'apache-airflow-providers-datadog',
'apache-airflow-providers-dingding',
'apache-airflow-providers-discord',
'apache-airflow-providers-docker',
'apache-airflow-providers-elasticsearch',
'apache-airflow-providers-exasol',
'apache-airflow-providers-facebook',
'apache-airflow-providers-ftp',
'apache-airflow-providers-google',
'apache-airflow-providers-grpc',
'apache-airflow-providers-hashicorp',
'apache-airflow-providers-http',
'apache-airflow-providers-imap',
'apache-airflow-providers-jdbc',
'apache-airflow-providers-jenkins',
'apache-airflow-providers-jira',
'apache-airflow-providers-microsoft-azure',
'apache-airflow-providers-microsoft-mssql',
'apache-airflow-providers-microsoft-winrm',
'apache-airflow-providers-mongo',
'apache-airflow-providers-mysql',
'apache-airflow-providers-odbc',
'apache-airflow-providers-openfaas',
'apache-airflow-providers-opsgenie',
'apache-airflow-providers-oracle',
'apache-airflow-providers-pagerduty',
'apache-airflow-providers-papermill',
'apache-airflow-providers-plexus',
'apache-airflow-providers-postgres',
'apache-airflow-providers-presto',
'apache-airflow-providers-qubole',
'apache-airflow-providers-redis',
'apache-airflow-providers-salesforce',
'apache-airflow-providers-samba',
'apache-airflow-providers-segment',
'apache-airflow-providers-sendgrid',
'apache-airflow-providers-sftp',
'apache-airflow-providers-singularity',
'apache-airflow-providers-slack',
'apache-airflow-providers-snowflake',
'apache-airflow-providers-sqlite',
'apache-airflow-providers-ssh',
'apache-airflow-providers-vertica',
'apache-airflow-providers-yandex',
'apache-airflow-providers-zendesk',
]


class TestProviderManager(unittest.TestCase):
def test_providers_are_loaded(self):
provider_manager = ProvidersManager()
provider_list = list(provider_manager.providers.keys())
provider_list.sort()
self.assertEqual(ALL_PROVIDERS, provider_list)

0 comments on commit 2c0920f

Please sign in to comment.