Skip to content

Commit

Permalink
[AIRFLOW-6894] Prevent db query in example_dags (#7516)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Feb 24, 2020
1 parent a812f48 commit dcf8743
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 1 deletion.
3 changes: 2 additions & 1 deletion airflow/providers/google/cloud/operators/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ def __init__(self,
self.gcp_cloudsql_conn_id = gcp_cloudsql_conn_id
self.autocommit = autocommit
self.parameters = parameters
self.gcp_connection = BaseHook.get_connection(self.gcp_conn_id)
self.gcp_connection = None

def _execute_query(self, hook: CloudSQLDatabaseHook, database_hook: Union[PostgresHook, MySqlHook]):
cloud_sql_proxy_runner = None
Expand All @@ -827,6 +827,7 @@ def _execute_query(self, hook: CloudSQLDatabaseHook, database_hook: Union[Postgr
cloud_sql_proxy_runner.stop_proxy()

def execute(self, context):
self.gcp_connection = BaseHook.get_connection(self.gcp_conn_id)
hook = CloudSQLDatabaseHook(
gcp_cloudsql_conn_id=self.gcp_cloudsql_conn_id,
gcp_conn_id=self.gcp_conn_id,
Expand Down
21 changes: 21 additions & 0 deletions tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
from glob import glob

from airflow.models import DagBag
from tests.test_utils.asserts import assert_queries_count

ROOT_FOLDER = os.path.realpath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)
)

NO_DB_QUERY_EXCEPTION = [
"/airflow/example_dags/example_subdag_operator.py"
]


class TestExampleDags(unittest.TestCase):
def test_should_be_importable(self):
Expand All @@ -38,3 +43,19 @@ def test_should_be_importable(self):
)
self.assertEqual(0, len(dagbag.import_errors), f"import_errors={str(dagbag.import_errors)}")
self.assertGreaterEqual(len(dagbag.dag_ids), 1)

def test_should_not_do_database_queries(self):
example_dags = glob(f"{ROOT_FOLDER}/airflow/**/example_dags/example_*.py", recursive=True)
example_dags = [
dag_file
for dag_file in example_dags
if any(not dag_file.endswith(e) for e in NO_DB_QUERY_EXCEPTION)
]
for filepath in example_dags:
relative_filepath = os.path.relpath(filepath, ROOT_FOLDER)
with self.subTest(f"File {relative_filepath} shouldn't do database queries"):
with assert_queries_count(0):
DagBag(
dag_folder=filepath,
include_examples=False,
)
44 changes: 44 additions & 0 deletions tests/test_utils/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,53 @@
# under the License.

import re
from contextlib import contextmanager

from sqlalchemy import event

from airflow.settings import engine


def assert_equal_ignore_multiple_spaces(case, first, second, msg=None):
def _trim(s):
return re.sub(r"\s+", " ", s.strip())
return case.assertEqual(_trim(first), _trim(second), msg)


class CountQueriesResult:
def __init__(self):
self.count = 0


class CountQueries:
"""
Counts the number of queries sent to Airflow Database in a given context.
Does not support multiple processes. When a new process is started in context, its queries will
not be included.
"""
def __init__(self):
self.result = CountQueriesResult()

def __enter__(self):
event.listen(engine, "after_cursor_execute", self.after_cursor_execute)
return self.result

def __exit__(self, type_, value, traceback):
event.remove(engine, "after_cursor_execute", self.after_cursor_execute)

def after_cursor_execute(self, *args, **kwargs):
self.result.count += 1


count_queries = CountQueries # pylint: disable=invalid-name


@contextmanager
def assert_queries_count(expected_count, message_fmt=None):
with count_queries() as result:
yield None
message_fmt = message_fmt or "The expected number of db queries is {expected_count}. " \
"The current number is {current_count}."
message = message_fmt.format(current_count=result.count, expected_count=expected_count)
assert expected_count == result.count, message

0 comments on commit dcf8743

Please sign in to comment.