Skip to content

Commit

Permalink
AIP-58: Add Airflow ObjectStore (AFS) (#34729)
Browse files Browse the repository at this point in the history
This adds the ObjectStorage and ObjectStorePath APIs per AIP-58. ObjectStorePath is a pathlib.Pathlib like interface for objects residing on object storage.
  • Loading branch information
bolkedebruin committed Oct 27, 2023
1 parent 85f0ef3 commit 04e2fbd
Show file tree
Hide file tree
Showing 57 changed files with 2,876 additions and 68 deletions.
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ body:
- celery
- cloudant
- cncf-kubernetes
- common-io
- common-sql
- daskexecutor
- databricks
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,7 @@ jobs:
# pip download --no-deps --dest dist apache-airflow-providers-<PROVIDER>==3.1.0
#
rm -vf dist/apache_airflow_providers_openlineage*.whl
rm -rf dist/apache_airflow_providers_common_io*.whl
- name: "Get all provider extras as AIRFLOW_EXTRAS env variable"
# Extras might be different on S3 so rather than relying on "all" we should get the list of
# packages to be installed from the current provider_dependencies.json file
Expand Down
19 changes: 10 additions & 9 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -671,15 +671,16 @@ aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam,
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
apprise, arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups,
cloudant, cncf.kubernetes, common.sql, crypto, dask, daskexecutor, databricks, datadog, dbt.cloud,
deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker,
druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes,
ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql,
mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas,
papermill, password, pinot, plexus, postgres, presto, rabbitmq, redis, s3, salesforce, samba,
segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd,
tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
cloudant, cncf.kubernetes, common.io, common.sql, crypto, dask, daskexecutor, databricks, datadog,
dbt.cloud, deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
doc_gen, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github,
github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc,
jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp,
microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie,
oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, rabbitmq,
redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp,
snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv,
webhdfs, winrm, yandex, zendesk
.. END EXTRAS HERE
Provider packages
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ if [[ ${UPGRADE_BOTO=} == "true" ]]; then
echo
echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run Amazon tests with them${COLOR_RESET}"
echo
pip uninstall --root-user-action ignore aiobotocore -y || true
pip uninstall --root-user-action ignore aiobotocore s3fs -y || true
pip install --root-user-action ignore --upgrade boto3 botocore
pip check
fi
Expand Down Expand Up @@ -1468,7 +1468,7 @@ RUN echo "Airflow version: ${AIRFLOW_VERSION}"
# Without grpcio-status limit, pip gets into very long backtracking
# We should attempt to remove it in the future
#
ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="grpcio-status>=1.55.0"
ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="grpcio-status>=1.55.0 aiobotocore>=2.7.0"
ARG UPGRADE_TO_NEWER_DEPENDENCIES="false"
ARG VERSION_SUFFIX_FOR_PYPI=""

Expand Down
19 changes: 10 additions & 9 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,16 @@ aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam,
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
apprise, arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups,
cloudant, cncf.kubernetes, common.sql, crypto, dask, daskexecutor, databricks, datadog, dbt.cloud,
deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker,
druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes,
ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql,
mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas,
papermill, password, pinot, plexus, postgres, presto, rabbitmq, redis, s3, salesforce, samba,
segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd,
tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
cloudant, cncf.kubernetes, common.io, common.sql, crypto, dask, daskexecutor, databricks, datadog,
dbt.cloud, deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
doc_gen, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github,
github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc,
jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp,
microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie,
oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, rabbitmq,
redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp,
snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv,
webhdfs, winrm, yandex, zendesk
# END EXTRAS HERE

# For installing Airflow in development environments - see CONTRIBUTING.rst
Expand Down
135 changes: 135 additions & 0 deletions airflow/example_dags/tutorial_objectstorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

# [START tutorial]
# [START import_module]
import pendulum
import requests

from airflow.decorators import dag, task
from airflow.io.store.path import ObjectStoragePath

# [END import_module]

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
"fmisid": "int32",
"time": "datetime64[ns]",
"AQINDEX_PT1H_avg": "float64",
"PM10_PT1H_avg": "float64",
"PM25_PT1H_avg": "float64",
"O3_PT1H_avg": "float64",
"CO_PT1H_avg": "float64",
"SO2_PT1H_avg": "float64",
"NO2_PT1H_avg": "float64",
"TRSC_PT1H_avg": "float64",
}

# [START create_object_storage_path]
base = ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
# [END create_object_storage_path]


# [START instantiate_dag]
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_objectstorage():
"""
### Object Storage Tutorial Documentation
This is a tutorial DAG to showcase the usage of the Object Storage API.
Documentation that goes along with the Airflow Object Storage tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
"""
# [END instantiate_dag]
import duckdb
import pandas as pd

# [START get_air_quality_data]
@task
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
"""
#### Get Air Quality Data
This task gets air quality data from the Finnish Meteorological Institute's
open data API. The data is saved as parquet.
"""
execution_date = kwargs["logical_date"]
start_time = kwargs["data_interval_start"]

params = {
"format": "json",
"precision": "double",
"groupareas": "0",
"producer": "airquality_urban",
"area": "Uusimaa",
"param": ",".join(aq_fields.keys()),
"starttime": start_time.isoformat(timespec="seconds"),
"endtime": execution_date.isoformat(timespec="seconds"),
"tz": "UTC",
}

response = requests.get(API, params=params)
response.raise_for_status()

# ensure the bucket exists
base.mkdir(exists_ok=True)

formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"

df = pd.DataFrame(response.json()).astype(aq_fields)
with path.open("wb") as file:
df.to_parquet(file)

return path

# [END get_air_quality_data]

# [START analyze]
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
#### Analyze
This task analyzes the air quality data, prints the results
"""
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

print(df2.head())

# [END analyze]

# [START main_flow]
obj_path = get_air_quality_data()
analyze(obj_path)
# [END main_flow]


# [START dag_invocation]
tutorial_objectstorage()
# [END dag_invocation]
# [END tutorial]
90 changes: 90 additions & 0 deletions airflow/io/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from typing import (
TYPE_CHECKING,
Callable,
)

from fsspec.implementations.local import LocalFileSystem

from airflow.compat.functools import cache
from airflow.providers_manager import ProvidersManager
from airflow.stats import Stats
from airflow.utils.module_loading import import_string

if TYPE_CHECKING:
from fsspec import AbstractFileSystem

log = logging.getLogger(__name__)


def _file(_: str | None) -> LocalFileSystem:
return LocalFileSystem()


# builtin supported filesystems
_BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None], AbstractFileSystem]] = {
"file": _file,
}


@cache
def _register_filesystems() -> dict[str, Callable[[str | None], AbstractFileSystem]]:
scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy()
with Stats.timer("airflow.io.load_filesystems") as timer:
manager = ProvidersManager()
for fs_module_name in manager.filesystem_module_names:
fs_module = import_string(fs_module_name)
for scheme in getattr(fs_module, "schemes", []):
if scheme in scheme_to_fs:
log.warning("Overriding scheme %s for %s", scheme, fs_module_name)

method = getattr(fs_module, "get_fs", None)
if method is None:
raise ImportError(f"Filesystem {fs_module_name} does not have a get_fs method")
scheme_to_fs[scheme] = method

log.debug("loading filesystems from providers took %.3f seconds", timer.duration)
return scheme_to_fs


def get_fs(scheme: str, conn_id: str | None = None) -> AbstractFileSystem:
"""
Get a filesystem by scheme.
:param scheme: the scheme to get the filesystem for
:return: the filesystem method
:param conn_id: the airflow connection id to use
"""
filesystems = _register_filesystems()
try:
return filesystems[scheme](conn_id)
except KeyError:
raise ValueError(f"No filesystem registered for scheme {scheme}")


def has_fs(scheme: str) -> bool:
"""
Check if a filesystem is available for a scheme.
:param scheme: the scheme to check
:return: True if a filesystem is available for the scheme
"""
return scheme in _register_filesystems()

0 comments on commit 04e2fbd

Please sign in to comment.