Skip to content

Commit

Permalink
Use libyaml C library when available. (#14577)
Browse files Browse the repository at this point in the history
This makes loading local providers 1/3 quicker -- from 2s down from 3s
on my local SSD.

The `airflow.utils.yaml` module can be used in place of the normal yaml
module, with the bonus that `safe_load` will use libyaml where available
instead of always using the pure python version.

This shaves 3 minutes off the "WWW" tests - down to 8 minutes from
11 minutes.

I have not used this module in tests/docs code etc, as I don't want to
force importing `airflow` (and everything in currently brings in) in to
those contexts.
  • Loading branch information
ashb committed Mar 5, 2021
1 parent 09327ba commit 7daebef
Show file tree
Hide file tree
Showing 20 changed files with 138 additions and 19 deletions.
2 changes: 1 addition & 1 deletion airflow/cli/commands/connection_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from typing import Any, Dict, List
from urllib.parse import urlparse, urlunparse

import yaml
from sqlalchemy.orm import exc

import airflow.utils.yaml as yaml
from airflow.cli.simple_table import AirflowConsole
from airflow.exceptions import AirflowNotFoundException
from airflow.hooks.base import BaseHook
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/kubernetes_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import os
import sys

import yaml
from kubernetes import client
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

import airflow.utils.yaml as yaml
from airflow.executors.kubernetes_executor import KubeConfig, create_pod_id
from airflow.kubernetes import pod_generator
from airflow.kubernetes.kube_client import get_kube_client
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/simple_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import json
from typing import Any, Callable, Dict, List, Optional, Union

import yaml
from rich.box import ASCII_DOUBLE_HEAD
from rich.console import Console
from rich.syntax import Syntax
from rich.table import Table
from tabulate import tabulate

import airflow.utils.yaml as yaml
from airflow.plugins_manager import PluginsDirectorySource
from airflow.utils.platform import is_tty

Expand Down
2 changes: 1 addition & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def default_config_yaml() -> dict:
:return: Python dictionary containing configs & their info
"""
import yaml
import airflow.utils.yaml as yaml

with open(_default_config_file_path('config.yml')) as config_file:
return yaml.safe_load(config_file)
Expand Down
2 changes: 1 addition & 1 deletion airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
from functools import reduce
from typing import List, Optional, Union

import yaml
from dateutil import parser
from kubernetes.client import models as k8s
from kubernetes.client.api_client import ApiClient

import airflow.utils.yaml as yaml
from airflow.exceptions import AirflowConfigException
from airflow.kubernetes.pod_generator_deprecated import PodGenerator as PodGeneratorDeprecated
from airflow.version import version as airflow_version
Expand Down
3 changes: 2 additions & 1 deletion airflow/kubernetes/refresh_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
from typing import Optional, cast

import pendulum
import yaml
from kubernetes.client import Configuration
from kubernetes.config.exec_provider import ExecProvider
from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader

import airflow.utils.yaml as yaml


def _parse_timestamp(ts_str: str) -> int:
parsed_dt = cast(pendulum.DateTime, pendulum.parse(ts_str))
Expand Down
6 changes: 5 additions & 1 deletion airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
import tempfile
from typing import Any, Dict, Generator, Optional, Tuple, Union

import yaml
from cached_property import cached_property
from kubernetes import client, config, watch

try:
import airflow.utils.yaml as yaml
except ImportError:
import yaml

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import warnings
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple

import yaml
from kubernetes.client import CoreV1Api, models as k8s

try:
import airflow.utils.yaml as yaml
except ImportError:
import yaml

from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.kubernetes.pod_generator import PodGenerator
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/google/cloud/operators/cloud_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
from typing import Any, Dict, Optional, Sequence, Union
from urllib.parse import unquote, urlparse

import yaml
try:
import airflow.utils.yaml as yaml
except ImportError:
import yaml

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
from typing import Any, Dict, NamedTuple, Set

import jsonschema
import yaml
from wtforms import Field

import airflow.utils.yaml as yaml
from airflow.utils.entry_points import entry_points_with_dist

try:
Expand Down
3 changes: 1 addition & 2 deletions airflow/secrets/local_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
from json import JSONDecodeError
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple

import yaml

import airflow.utils.yaml as yaml
from airflow.exceptions import (
AirflowException,
AirflowFileParseException,
Expand Down
76 changes: 76 additions & 0 deletions airflow/utils/yaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Use libyaml for YAML dump/load operations where possible.
If libyaml is available we will use it -- it is significantly faster.
This module delegates all other properties to the yaml module, so it can be used as:
.. code-block:: python
import airflow.utils.yaml as yaml
And then be used directly in place of the normal python module.
"""
import sys
from typing import TYPE_CHECKING, Any, BinaryIO, TextIO, Union, cast

if TYPE_CHECKING:
from yaml.error import MarkedYAMLError # noqa


def safe_load(stream: Union[bytes, str, BinaryIO, TextIO]) -> Any:
"""Like yaml.safe_load, but use the C libyaml for speed where we can"""
# delay import until use.
from yaml import load as orig

try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader # type: ignore[no-redef]

return orig(stream, SafeLoader)


def dump(data: Any, **kwargs) -> str:
"""Like yaml.safe_dump, but use the C libyaml for speed where we can"""
# delay import until use.
from yaml import dump as orig

try:
from yaml import CSafeDumper as SafeDumper
except ImportError:
from yaml import SafeDumper # type: ignore[no-redef]

return cast(str, orig(data, Dumper=SafeDumper, **kwargs))


def __getattr__(name):
# Delegate anything else to the yaml module
import yaml

if name == "FullLoader":
# Try to use CFullLoader by default
getattr(yaml, "CFullLoader", yaml.FullLoader)

return getattr(yaml, name)


if sys.version_info < (3, 7):
from pep562 import Pep562

Pep562(__name__)
2 changes: 1 addition & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import lazy_object_proxy
import nvd3
import sqlalchemy as sqla
import yaml
from flask import (
Markup,
Response,
Expand Down Expand Up @@ -66,6 +65,7 @@
from wtforms.validators import InputRequired

import airflow
import airflow.utils.yaml as yaml
from airflow import models, plugins_manager, settings
from airflow.api.common.experimental.mark_tasks import (
set_dag_run_state_to_failed,
Expand Down
1 change: 1 addition & 0 deletions dev/provider_packages/copy_provider_package_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def rename_deprecated_modules(self) -> None:
("airflow.sensors.time_delta", "airflow.sensors.time_delta_sensor"),
("airflow.sensors.weekday", "airflow.contrib.sensors.weekday_sensor"),
("airflow.utils.session", "airflow.utils.db"),
("airflow.utils.yaml", "yaml"),
]
for new, old in changes:
self.qry.select_module(new).rename(old)
Expand Down
7 changes: 6 additions & 1 deletion dev/provider_packages/prepare_provider_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
from rich.console import Console
from rich.syntax import Syntax

try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader # type: ignore[no-redef]

INITIAL_CHANGELOG_CONTENT = """
Expand Down Expand Up @@ -1325,7 +1330,7 @@ def get_provider_info_from_provider_yaml(provider_package_id: str) -> Dict[str,
if not os.path.exists(provider_yaml_file_name):
raise Exception(f"The provider.yaml file is missing: {provider_yaml_file_name}")
with open(provider_yaml_file_name) as provider_file:
provider_yaml_dict = yaml.safe_load(provider_file.read())
provider_yaml_dict = yaml.load(provider_file, SafeLoader)
provider_info = convert_to_provider_info(provider_yaml_dict)
validate_provider_info_with_2_0_0_schema(provider_info)
validate_provider_info_with_runtime_schema(provider_info)
Expand Down
7 changes: 6 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@

import yaml

try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader # type: ignore[misc]

import airflow
from airflow.configuration import AirflowConfigParser, default_config_yaml
from docs.exts.docs_build.third_party_inventories import ( # pylint: disable=no-name-in-module,wrong-import-order
Expand Down Expand Up @@ -332,7 +337,7 @@ def _load_config():
return {}

with open(file_path) as config_file:
return yaml.safe_load(config_file)
return yaml.load(config_file, SafeLoader)

config = _load_config()
if config:
Expand Down
7 changes: 6 additions & 1 deletion docs/exts/docs_build/lint_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@

import yaml

try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader # type: ignore[misc]

import airflow
from docs.exts.docs_build.docs_builder import ALL_PROVIDER_YAMLS # pylint: disable=no-name-in-module
from docs.exts.docs_build.errors import DocBuildError # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -327,7 +332,7 @@ def check_docker_image_tag_in_quick_start_guide() -> List[DocBuildError]:
# master tag is little outdated.
expected_image = f'apache/airflow:{expected_tag}'
with open(compose_file_path) as yaml_file:
content = yaml.safe_load(yaml_file)
content = yaml.load(yaml_file, SafeLoader)
current_image_expression = content['x-airflow-common']['image']
if expected_image not in current_image_expression:
build_errors.append(
Expand Down
8 changes: 7 additions & 1 deletion docs/exts/provider_yaml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
import jsonschema
import yaml

try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader # type: ignore[misc]


ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
PROVIDER_DATA_SCHEMA_PATH = os.path.join(ROOT_DIR, "airflow", "provider.yaml.schema.json")

Expand Down Expand Up @@ -53,7 +59,7 @@ def load_package_data() -> List[Dict[str, Any]]:
result = []
for provider_yaml_path in get_provider_yaml_paths():
with open(provider_yaml_path) as yaml_file:
provider = yaml.safe_load(yaml_file)
provider = yaml.load(yaml_file, SafeLoader)
try:
jsonschema.validate(provider, schema=schema)
except jsonschema.ValidationError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@

import yaml

try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader # type: ignore[no-redef]


def main() -> int:
parser = argparse.ArgumentParser()
Expand All @@ -34,7 +39,7 @@ def main() -> int:
retval = 0

with open('.pre-commit-config.yaml', 'rb') as f:
content = yaml.safe_load(f)
content = yaml.load(f, SafeLoader)
errors = get_errors(content, max_length)
if errors:
retval = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
import yaml
from tabulate import tabulate

try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader # type: ignore[no-redef]

if __name__ != "__main__":
raise Exception(
"This file is intended to be executed as an executable program. You cannot use it as a module."
Expand Down Expand Up @@ -60,7 +65,7 @@ def _load_package_data(package_paths: Iterable[str]):
result = {}
for provider_yaml_path in package_paths:
with open(provider_yaml_path) as yaml_file:
provider = yaml.safe_load(yaml_file)
provider = yaml.load(yaml_file, SafeLoader)
rel_path = os.path.relpath(provider_yaml_path, ROOT_DIR)
try:
jsonschema.validate(provider, schema=schema)
Expand Down

0 comments on commit 7daebef

Please sign in to comment.