Skip to content

Commit

Permalink
[AIRFLOW-6980] Improve system tests and building providers package (#…
Browse files Browse the repository at this point in the history
…7615)

This PR removes initdb from system tests setup as it seems unneccessary operation.
Also some automatic code changes has been added before building providers package.

fixup! [AIRFLOW-6980] Improve system tests and building providers package

fixup! [AIRFLOW-6980] Improve system tests and building providers package

fixup! fixup! [AIRFLOW-6980] Improve system tests and building providers package
  • Loading branch information
turbaszek committed Mar 10, 2020
1 parent 2f42fa5 commit 1f77f94
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 112 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ jobs:
script: "./scripts/ci/ci_run_airflow_testing.sh --ignore=tests/providers"
stage: test
- name: "Prepare backport packages"
before_install: echo
before_install: pip install bowler
stage: test
script: ./scripts/ci/ci_prepare_backport_packages.sh
before_install:
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/example_dags/example_dlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from google.cloud.dlp_v2.types import ContentItem, InspectConfig, InspectTemplate

from airflow import DAG
from airflow import models
from airflow.providers.google.cloud.operators.dlp import (
CloudDLPCreateInspectTemplateOperator, CloudDLPDeleteInspectTemplateOperator,
CloudDLPInspectContentOperator,
Expand All @@ -52,10 +52,10 @@
INSPECT_TEMPLATE = InspectTemplate(inspect_config=INSPECT_CONFIG)


with DAG(
with models.DAG(
"example_gcp_dlp",
default_args=default_args,
schedule_interval=None,
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag:
create_template = CloudDLPCreateInspectTemplateOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import json

from airflow.models import DAG
from airflow import models
from airflow.providers.google.cloud.operators.stackdriver import (
StackdriverDeleteAlertOperator, StackdriverDeleteNotificationChannelOperator,
StackdriverDisableAlertPoliciesOperator, StackdriverDisableNotificationChannelsOperator,
Expand Down Expand Up @@ -108,9 +108,10 @@

default_args = {"start_date": days_ago(1)}

with DAG(
with models.DAG(
'example_stackdriver',
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=['example']
) as dag:
# [START howto_operator_gcp_stackdriver_upsert_notification_channel]
Expand Down
9 changes: 7 additions & 2 deletions airflow/providers/google/cloud/example_dags/example_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from google.cloud.tasks_v2.types import Queue
from google.protobuf import timestamp_pb2

from airflow import DAG
from airflow import models
from airflow.providers.google.cloud.operators.tasks import (
CloudTasksQueueCreateOperator, CloudTasksTaskCreateOperator, CloudTasksTaskRunOperator,
)
Expand All @@ -53,7 +53,12 @@
"schedule_time": timestamp,
}

with DAG("example_gcp_tasks", default_args=default_args, schedule_interval=None, tags=['example'],) as dag:
with models.DAG(
"example_gcp_tasks",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag:

create_queue = CloudTasksQueueCreateOperator(
location=LOCATION,
Expand Down
82 changes: 79 additions & 3 deletions backport_packages/setup_backport_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import textwrap
from importlib import util
from os.path import dirname
from shutil import copytree, rmtree
from shutil import copyfile, copytree, rmtree
from typing import Dict, List

from bowler import LN, TOKEN, BowlerTool, Capture, Filename, Query
from fissix.pytree import Leaf
from setuptools import Command, find_packages, setup as setuptools_setup

BowlerTool.IN_PROCESS = True

logger = logging.getLogger(__name__)

# Kept manually in sync with airflow.__version__
Expand Down Expand Up @@ -138,6 +142,71 @@ def run(self):
DEPENDENCIES_JSON_FILE = os.path.join(os.pardir, "airflow", "providers", "dependencies.json")


def change_import_paths_to_deprecated():
changes = [
("airflow.operators.bash", "airflow.operators.bash_operator"),
("airflow.operators.python", "airflow.operators.python_operator"),
("airflow.utils.session", "airflow.utils.db"),
]

qry = Query()
for new, old in changes:
qry.select_module(new).rename(old)

# Move and refactor imports for Dataflow
copyfile(
os.path.join(dirname(__file__), os.pardir, "airflow", "utils", "python_virtualenv.py"),
os.path.join(dirname(__file__), "airflow", "providers",
"google", "cloud", "utils", "python_virtualenv.py"
)
)
(
qry
.select_module("airflow.utils.python_virtualenv")
.rename("airflow.providers.google.cloud.utils.python_virtualenv")
)
copyfile(
os.path.join(dirname(__file__), os.pardir, "airflow", "utils", "process_utils.py"),
os.path.join(dirname(__file__), "airflow", "providers",
"google", "cloud", "utils", "process_utils.py"
)
)
(
qry
.select_module("airflow.utils.process_utils")
.rename("airflow.providers.google.cloud.utils.process_utils")
)

# Remove tags
qry.select_method("DAG").is_call().modify(remove_tags_modifier)

# Fix KubernetesPodOperator imports to use old path
qry.select_module(
"airflow.providers.cncf.kubernetes.operators.kubernetes_pod").rename(
"airflow.contrib.operators.kubernetes_pod_operator"
)

# Fix BaseOperatorLinks imports
files = r"bigquery\.py|mlengine\.py" # noqa
qry.select_module("airflow.models").is_filename(include=files).filter(pure_airflow_models_filter).rename(
"airflow.models.baseoperator")

qry.execute(write=True, silent=False, interactive=False)


def remove_tags_modifier(node: LN, capture: Capture, filename: Filename) -> None:
for node in capture['function_arguments'][0].post_order():
if isinstance(node, Leaf) and node.value == "tags" and node.type == TOKEN.NAME:
if node.parent.next_sibling and node.parent.next_sibling.value == ",":
node.parent.next_sibling.remove()
node.parent.remove()


def pure_airflow_models_filter(node: LN, capture: Capture, filename: Filename) -> bool:
"""Check if select is exactly [airflow, . , models]"""
return len([ch for ch in node.children[1].leaves()]) == 3


def copy_provider_sources():
build_dir = os.path.join(dirname(__file__), "build")
if os.path.isdir(build_dir):
Expand All @@ -153,9 +222,13 @@ def get_provider_package_name(provider_module: str):
return "apache-airflow-providers-" + provider_module.replace(".", "-")


def copy_and_refactor_sources():
copy_provider_sources()
change_import_paths_to_deprecated()


def do_setup_package_providers(provider_module: str, deps: List[str], extras: Dict[str, List[str]]):
setup.write_version()
copy_provider_sources()
provider_package_name = get_provider_package_name(provider_module)
package_name = f'{provider_package_name}' if provider_module != "providers" \
else f'apache-airflow-providers'
Expand Down Expand Up @@ -240,9 +313,12 @@ def usage():
possible_first_params.append(LIST_BACKPORT_PACKAGES)
if len(sys.argv) == 1:
print()
print("ERROR! Mising first param")
print("ERROR! Missing first param")
print()
usage()
elif sys.argv[1] == "prepare":
print("Copying sources and doing refactor")
copy_and_refactor_sources()
elif sys.argv[1] not in possible_first_params:
print()
print(f"ERROR! Wrong first param: {sys.argv[1]}")
Expand Down
3 changes: 2 additions & 1 deletion scripts/ci/ci_before_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ set -x
build_image_on_ci

# We need newer version of six for Travis as they bundle 1.11.0 version
sudo pip install pre-commit 'six~=1.14'
# Bowler is installed for backport packages build
pip install pre-commit bowler 'six~=1.14'
6 changes: 6 additions & 0 deletions scripts/ci/ci_prepare_backport_packages.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ rm -rf -- *.egg-info

BACKPORT_PACKAGES=$(python3 setup_backport_packages.py list-backport-packages)

echo "-----------------------------------------------------------------------------------"
echo " Copying sources and doing refactor for backporting"
echo "-----------------------------------------------------------------------------------"
echo
python3 setup_backport_packages.py prepare

for BACKPORT_PACKAGE in ${BACKPORT_PACKAGES}
do
echo
Expand Down
2 changes: 0 additions & 2 deletions scripts/ci/docker-compose/remove-sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,3 @@ services:
# Everything you install in Docker
volumes:
- ./empty:/opt/airflow/airflow:cached
# necessary for system tests - we need to take example dags from there
- ../../../airflow/providers:/providers:cached
108 changes: 10 additions & 98 deletions tests/test_utils/system_tests_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
# under the License.
import os
import shutil
from contextlib import ContextDecorator
import sys
from datetime import datetime
from shutil import move
from tempfile import mkdtemp
from unittest import TestCase

from airflow.configuration import AIRFLOW_HOME, AirflowConfigParser, get_airflow_config
Expand All @@ -34,20 +32,6 @@
DEFAULT_DAG_FOLDER = os.path.join(AIRFLOW_MAIN_FOLDER, "airflow", "example_dags")


def resolve_dags_folder() -> str:
"""
Returns DAG folder specified in current Airflow config.
"""
config_file = get_airflow_config(AIRFLOW_HOME)
conf = AirflowConfigParser()
conf.read(config_file)
try:
dags = conf.get("core", "dags_folder")
except AirflowException:
dags = os.path.join(AIRFLOW_HOME, 'dags')
return dags


def resolve_logs_folder() -> str:
"""
Returns LOGS folder specified in current Airflow config.
Expand All @@ -65,48 +49,6 @@ def resolve_logs_folder() -> str:
return logs


class EmptyDagsDirectory( # pylint: disable=invalid-name
ContextDecorator, LoggingMixin
):
"""
Context manager that temporally removes DAGs from provided directory.
"""

def __init__(self, dag_directory: str) -> None:
super().__init__()
self.dag_directory = dag_directory
self.temp_dir = mkdtemp()

def __enter__(self) -> str:
self._store_dags_to_temporary_directory(self.dag_directory, self.temp_dir)
return self.temp_dir

def __exit__(self, *args, **kwargs) -> None:
self._restore_dags_from_temporary_directory(self.dag_directory, self.temp_dir)

def _store_dags_to_temporary_directory(
self, dag_folder: str, temp_dir: str
) -> None:
self.log.info(
"Storing DAGS from %s to temporary directory %s", dag_folder, temp_dir
)
try:
os.mkdir(dag_folder)
except OSError:
pass
for file in os.listdir(dag_folder):
move(os.path.join(dag_folder, file), os.path.join(temp_dir, file))

def _restore_dags_from_temporary_directory(
self, dag_folder: str, temp_dir: str
) -> None:
self.log.info(
"Restoring DAGS to %s from temporary directory %s", dag_folder, temp_dir
)
for file in os.listdir(temp_dir):
move(os.path.join(temp_dir, file), os.path.join(dag_folder, file))


class SystemTest(TestCase, LoggingMixin):
@staticmethod
def execute_cmd(*args, **kwargs):
Expand All @@ -123,9 +65,6 @@ def setUp(self) -> None:
We also remove all logs from logs directory to have a clear log state and see only logs from this
test.
"""
dag_folder = resolve_dags_folder()
with EmptyDagsDirectory(dag_folder):
self.initial_db_init()
print()
print("Removing all log files except previous_runs")
print()
Expand Down Expand Up @@ -157,15 +96,8 @@ def tearDown(self) -> None:
shutil.move(file_path, target_dir)
super().setUp()

def initial_db_init(self):
if os.environ.get("RUN_AIRFLOW_1_10"):
print("Attempting to reset the db using airflow command")
os.system("airflow resetdb -y")
else:
from airflow.utils import db
db.resetdb()

def _print_all_log_files(self):
@staticmethod
def _print_all_log_files():
print()
print("Printing all log files")
print()
Expand All @@ -180,25 +112,6 @@ def _print_all_log_files(self):
with open(filepath, "r") as f:
print(f.read())

def correct_imports_for_airflow_1_10(self, directory):
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
if filepath.endswith(".py"):
self.replace_airflow_1_10_imports(filepath)

def replace_airflow_1_10_imports(self, filepath):
replacements = [
("airflow.operators.bash", "airflow.operators.bash_operator"),
("airflow.operators.python", "airflow.operators.python_operator"),
]
with open(filepath, "rt") as file:
data = file.read()
for replacement in replacements:
data = data.replace(replacement[0], replacement[1])
with open(filepath, "wt") as file:
file.write(data)

def run_dag(self, dag_id: str, dag_folder: str = DEFAULT_DAG_FOLDER) -> None:
"""
Runs example dag by it's ID.
Expand All @@ -209,14 +122,13 @@ def run_dag(self, dag_id: str, dag_folder: str = DEFAULT_DAG_FOLDER) -> None:
:type dag_folder: str
"""
if os.environ.get("RUN_AIRFLOW_1_10"):
# For system tests purpose we are mounting airflow/providers to /providers folder
# So that we can get example_dags from there
dag_folder = dag_folder.replace("/opt/airflow/airflow/providers", "/providers")
temp_dir = mkdtemp()
os.rmdir(temp_dir)
shutil.copytree(dag_folder, temp_dir)
dag_folder = temp_dir
self.correct_imports_for_airflow_1_10(temp_dir)
# For system tests purpose we are changing airflow/providers
# to side packages path of the installed providers package
python = f"python{sys.version_info.major}.{sys.version_info.minor}"
dag_folder = dag_folder.replace(
"/opt/airflow/airflow/providers",
f"/usr/local/lib/{python}/site-packages/airflow/providers",
)
self.log.info("Looking for DAG: %s in %s", dag_id, dag_folder)
dag_bag = DagBag(dag_folder=dag_folder, include_examples=False)
dag = dag_bag.get_dag(dag_id)
Expand Down

0 comments on commit 1f77f94

Please sign in to comment.