Skip to content

Commit

Permalink
All classes in backport providers are now importable in Airflow 1.10 (#…
Browse files Browse the repository at this point in the history
…8991)

* All classes in backport providers are now importable in Airflow 1.10

* fixup! All classes in backport providers are now importable in Airflow 1.10

* fixup! fixup! All classes in backport providers are now importable in Airflow 1.10
  • Loading branch information
potiuk committed May 26, 2020
1 parent 14fb585 commit cdb3f25
Show file tree
Hide file tree
Showing 22 changed files with 1,090 additions and 330 deletions.
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ metastore_browser/templates/.*\\.html$|.*\\.jinja2"
^airflow/operators/.*$|
^airflow/sensors/.*$|
^airflow/providers/.*$|
^airflow/contrib/.*$
^airflow/contrib/.*$|
^backport_packages/.*$
- id: base-operator
language: pygrep
name: Make sure BaseOperator[Link] is imported from airflow.models outside of core
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@
version_added: ~
type: string
example: ~
default: "Airflow HiveOperator task for {{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}}"
default: ~
- name: webserver
description: ~
options:
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ default_hive_mapred_queue =

# Template for mapred_job_name in HiveOperator, supports the following named parameters
# hostname, dag_id, task_id, execution_date
mapred_job_name_template = Airflow HiveOperator task for {{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}}
# mapred_job_name_template =

[webserver]
# The base url of your website as airflow cannot guess what domain or
Expand Down
1 change: 0 additions & 1 deletion airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ default_owner = airflow

[hive]
default_hive_mapred_queue = airflow
mapred_job_name_template = Airflow HiveOperator task for {{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}}

[webserver]
base_url = http://localhost:8080
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/apache/hive/operators/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ def __init__(
self.mapred_queue = mapred_queue
self.mapred_queue_priority = mapred_queue_priority
self.mapred_job_name = mapred_job_name
self.mapred_job_name_template = conf.get('hive',
'mapred_job_name_template')
self.mapred_job_name_template = conf.get(
'hive', 'mapred_job_name_template',
fallback="Airflow HiveOperator task for {hostname}.{dag_id}.{task_id}.{execution_date}")

# assigned lazily - just for consistency we can create the attribute with a
# `None` initial value, later it will be populated by the execute method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from google.cloud.datacatalog_v1beta1.proto.tags_pb2 import FieldType, TagField, TagTemplateField

from airflow import models
from airflow.models.baseoperator import chain
from airflow.operators.bash_operator import BashOperator
from airflow.providers.google.cloud.operators.datacatalog import (
CloudDataCatalogCreateEntryGroupOperator, CloudDataCatalogCreateEntryOperator,
Expand All @@ -38,6 +37,7 @@
CloudDataCatalogUpdateTagTemplateOperator,
)
from airflow.utils.dates import days_ago
from airflow.utils.helpers import chain

default_args = {"start_date": days_ago(1)}

Expand Down
24 changes: 24 additions & 0 deletions airflow/providers/papermill/ADDITIONAL_INFO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Additional notes

Papermill operator is the only one that work with AUTO inlets for now (for lineage support).
However since AUTO inlets is a feature of Airflow 2 and is not bacported,
when back-porting to 1.10 AUTO inlets are not supported.
97 changes: 97 additions & 0 deletions backport_packages/import_all_provider_classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import importlib
import os
import sys
import traceback
from inspect import isclass
from typing import List


def import_all_provider_classes(source_path: str,
provider_ids: List[str] = None,
print_imports: bool = False) -> List[str]:
"""
Imports all classes in providers packages. This method loads and imports
all the classes found in providers, so that we can find all the subclasses
of operators/sensors etc.
:param provider_ids - provider ids that should be loaded.
:param print_imports - if imported class should also be printed in output
:param source_path: path to look for sources - might be None to look for all packages in all source paths
:return: list of all imported classes
"""
if provider_ids:
prefixed_provider_paths = [source_path + "/airflow/providers/" + provider_id.replace(".", "/")
for provider_id in provider_ids]
else:
prefixed_provider_paths = [source_path + "/airflow/providers/"]

imported_classes = []
tracebacks = []
for root, dirs, files in os.walk(source_path):
if all([not root.startswith(prefix_provider_path)
for prefix_provider_path in prefixed_provider_paths]) or root.endswith("__pycache__"):
# Skip loading module if it is not in the list of providers that we are looking for
continue
package_name = root[len(source_path) + 1:].replace("/", ".")
for file in files:
if file.endswith(".py"):
module_name = package_name + "." + file[:-3] if file != "__init__.py" else package_name
if print_imports:
print(f"Importing module: {module_name}")
# noinspection PyBroadException
try:
_module = importlib.import_module(module_name)
for attribute_name in dir(_module):
class_name = module_name + "." + attribute_name
attribute = getattr(_module, attribute_name)
if isclass(attribute):
if print_imports:
print(f"Imported {class_name}")
imported_classes.append(class_name)
except Exception:
exception_str = traceback.format_exc()
tracebacks.append(exception_str)
if tracebacks:
print("""
ERROR: There were some import errors
""", file=sys.stderr)
for trace in tracebacks:
print("----------------------------------------", file=sys.stderr)
print(trace, file=sys.stderr)
print("----------------------------------------", file=sys.stderr)
sys.exit(1)
else:
return imported_classes


if __name__ == '__main__':
install_source_path = None
for python_path_candidate in sys.path:
providers_path_candidate = os.path.join(python_path_candidate, "airflow", "providers")
if os.path.isdir(providers_path_candidate):
install_source_path = python_path_candidate
print()
print(f"Walking all paths in {install_source_path}")
print()
import_all_provider_classes(print_imports=True, source_path=install_source_path)
print()
print("SUCCESS: All backport packages are importable!")
print()

0 comments on commit cdb3f25

Please sign in to comment.