Skip to content

Commit

Permalink
Add Google Authentication for experimental API (#9848)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Jul 22, 2020
1 parent 708197b commit 39a0288
Show file tree
Hide file tree
Showing 18 changed files with 864 additions and 26 deletions.
4 changes: 3 additions & 1 deletion airflow/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def load_auth():
pass

try:
return import_module(auth_backend)
auth_backend = import_module(auth_backend)
log.info("Loaded API auth backend: %s", auth_backend)
return auth_backend
except ImportError as err:
log.critical(
"Cannot import %s for API authentication due to: %s",
Expand Down
8 changes: 7 additions & 1 deletion airflow/api/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,14 @@ def get_current_api_client() -> Client:
Return current API Client based on current Airflow configuration
"""
api_module = import_module(conf.get('cli', 'api_client')) # type: Any
auth_backend = api.load_auth()
session = None
session_factory = getattr(auth_backend, 'create_client_session', None)
if session_factory:
session = session_factory()
api_client = api_module.Client(
api_base_url=conf.get('cli', 'endpoint_url'),
auth=api.load_auth().CLIENT_AUTH
auth=getattr(auth_backend, 'CLIENT_AUTH', None),
session=session
)
return api_client
7 changes: 5 additions & 2 deletions airflow/api/client/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
# specific language governing permissions and limitations
# under the License.
"""Client for all the API clients."""
import requests


class Client:
"""Base API client for all API clients."""

def __init__(self, api_base_url, auth):
def __init__(self, api_base_url, auth=None, session=None):
self._api_base_url = api_base_url
self._auth = auth
self._session: requests.Session = session or requests.Session()
if auth:
self._session.auth = auth

def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
"""Create a dag run for the specified dag.
Expand Down
6 changes: 1 addition & 5 deletions airflow/api/client/json_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

from urllib.parse import urljoin

import requests

from airflow.api.client import api_client


Expand All @@ -30,12 +28,10 @@ class Client(api_client.Client):
def _request(self, url, method='GET', json=None):
params = {
'url': url,
'auth': self._auth,
}
if json is not None:
params['json'] = json

resp = getattr(requests, method.lower())(**params) # pylint: disable=not-callable
resp = getattr(self._session, method.lower())(**params) # pylint: disable=not-callable
if not resp.ok:
# It is justified here because there might be many resp types.
# noinspection PyBroadException
Expand Down
16 changes: 16 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,22 @@
type: integer
example: ~
default: "100"
- name: google_oauth2_audience
description: The intended audience for JWT token credentials used for authorization.
This value must match on the client and server sides.
If empty, audience will not be tested.
type: string
example: project-id-random-value.apps.googleusercontent.com
default: ""
- name: google_key_path
description: |
Path to GCP Credential JSON file. If ommited, authorization based on `the Application Default
Credentials
<https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__ will
be used.
type: string
example: /files/service-account-json
default: ""
- name: lineage
description: ~
options:
Expand Down
11 changes: 11 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,17 @@ maximum_page_limit = 100
# If no limit is supplied, the OpenApi spec default is used.
fallback_page_limit = 100

# The intended audience for JWT token credentials used for authorization. This value must match on the client and server sides. If empty, audience will not be tested.
# Example: google_oauth2_audience = project-id-random-value.apps.googleusercontent.com
google_oauth2_audience =

# Path to GCP Credential JSON file. If ommited, authorization based on `the Application Default
# Credentials
# <https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__ will
# be used.
# Example: google_key_path = /files/service-account-json
google_key_path =

[lineage]
# what lineage backend to use
backend =
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/google/common/auth_backend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
138 changes: 138 additions & 0 deletions airflow/providers/google/common/auth_backend/google_openid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Authentication backend that use Google credentials for authorization."""
import logging
from functools import wraps
from typing import Callable, Optional, TypeVar, cast

import google
import google.auth.transport.requests
import google.oauth2.id_token
from flask import Response, _request_ctx_stack, current_app, request as flask_request # type: ignore
from google.auth import exceptions
from google.auth.transport.requests import AuthorizedSession
from google.oauth2 import service_account

from airflow.configuration import conf
from airflow.providers.google.common.utils.id_token_credentials import get_default_id_token_credentials

log = logging.getLogger(__name__)

_GOOGLE_ISSUERS = ("accounts.google.com", "https://accounts.google.com")
AUDIENCE = conf.get("api", "google_oauth2_audience")


def create_client_session():
"""Create a HTTP authorized client."""
service_account_path = conf.get("api", "google_key_path")
if service_account_path:
id_token_credentials = service_account.IDTokenCredentials.from_service_account_file(
service_account_path
)
else:
id_token_credentials = get_default_id_token_credentials(target_audience=AUDIENCE)
return AuthorizedSession(credentials=id_token_credentials)


def init_app(_):
"""Initializes authentication."""


def _get_id_token_from_request(request) -> Optional[str]:
authorization_header = request.headers.get("Authorization")

if not authorization_header:
return None

authorization_header_parts = authorization_header.split(" ", 2)

if len(authorization_header_parts) != 2 or authorization_header_parts[0].lower() != "bearer":
return None

id_token = authorization_header_parts[1]
return id_token


def _verify_id_token(id_token: str) -> Optional[str]:
try:
request_adapter = google.auth.transport.requests.Request()
id_info = google.oauth2.id_token.verify_token(id_token, request_adapter, AUDIENCE)
except exceptions.GoogleAuthError:
return None

# This check is part of google-auth v1.19.0 (2020-07-09), In order not to create strong version
# requirements to too new version, we check it in our code too.
# One day, we may delete this code and set minimum version in requirements.
if id_info.get("iss") not in _GOOGLE_ISSUERS:
return None

if not id_info.get("email_verified", False):
return None

return id_info.get("email")


def _lookup_user(user_email: str):
security_manager = current_app.appbuilder.sm
user = security_manager.find_user(email=user_email)

if not user:
return None

if not user.is_active:
return None

return user


def _set_current_user(user):
ctx = _request_ctx_stack.top
ctx.user = user


T = TypeVar("T", bound=Callable) # pylint: disable=invalid-name


def requires_authentication(function: T):
"""Decorator for functions that require authentication."""

@wraps(function)
def decorated(*args, **kwargs):
access_token = _get_id_token_from_request(flask_request)
if not access_token:
log.debug("Missing ID Token")
return Response("Forbidden", 403)

userid = _verify_id_token(access_token)
if not userid:
log.debug("Invalid ID Token")
return Response("Forbidden", 403)

log.debug("Looking for user with e-mail: %s", userid)

user = _lookup_user(userid)
if not user:
return Response("Forbidden", 403)

log.debug("Found user: %s", user)

_set_current_user(user)

return function(*args, **kwargs)

return cast(T, decorated)
16 changes: 16 additions & 0 deletions airflow/providers/google/common/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

0 comments on commit 39a0288

Please sign in to comment.