Skip to content

Commit

Permalink
[AIRFLOW-6290] Create guide for GKE operators (#8883)
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjinP committed Jun 5, 2020
1 parent 997ddb6 commit f56811d
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
GCP_LOCATION = os.environ.get("GCP_GKE_LOCATION", "europe-north1-a")
CLUSTER_NAME = os.environ.get("GCP_GKE_CLUSTER_NAME", "cluster-name")

# [START howto_operator_gcp_gke_create_cluster_definition]
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}
# [END howto_operator_gcp_gke_create_cluster_definition]

default_args = {"start_date": days_ago(1)}

Expand All @@ -42,12 +44,14 @@
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag:
# [START howto_operator_gke_create_cluster]
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
)
# [END howto_operator_gke_create_cluster]

pod_task = GKEStartPodOperator(
task_id="pod_task",
Expand All @@ -59,6 +63,7 @@
name="test-pod",
)

# [START howto_operator_gke_start_pod_xcom]
pod_task_xcom = GKEStartPodOperator(
task_id="pod_task_xcom",
project_id=GCP_PROJECT_ID,
Expand All @@ -70,18 +75,23 @@
cmds=["sh", "-c", 'mkdir -p /airflow/xcom/;echo \'[1,2,3,4]\' > /airflow/xcom/return.json'],
name="test-pod-xcom",
)
# [END howto_operator_gke_start_pod_xcom]

# [START howto_operator_gke_xcom_result]
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
# [END howto_operator_gke_xcom_result]

# [START howto_operator_gke_delete_cluster]
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
)
# [END howto_operator_gke_delete_cluster]

create_cluster >> pod_task >> delete_cluster
create_cluster >> pod_task_xcom >> delete_cluster
Expand Down
130 changes: 130 additions & 0 deletions docs/howto/operator/gcp/kubernetes_engine.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Google Kubernetes Engine Operators
==================================

`Google Kubernetes Engine (GKE) <https://cloud.google.com/kubernetes-engine/>`__ provides a managed environment for
deploying, managing, and scaling your containerized applications using Google infrastructure. The GKE environment
consists of multiple machines (specifically, Compute Engine instances) grouped together to form a cluster.

.. contents::
:depth: 1
:local:

Prerequisite Tasks
^^^^^^^^^^^^^^^^^^

.. include:: _partials/prerequisite_tasks.rst

Manage GKE cluster
^^^^^^^^^^^^^^^^^^

A cluster is the foundation of GKE - all workloads run on on top of the cluster. It is made up on a cluster master
and worker nodes. The lifecycle of the master is managed by GKE when creating or deleting a cluster.
The worker nodes are represented as Compute Engine VM instances that GKE creates on your behalf when creating a cluster.

.. _howto/operator:GKECreateClusterOperator:

Create GKE cluster
""""""""""""""""""

Here is an example of a cluster definition:

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
:language: python
:start-after: [START howto_operator_gcp_gke_create_cluster_definition]
:end-before: [END howto_operator_gcp_gke_create_cluster_definition]

A dict object like this, or a
:class:`~google.cloud.container_v1.types.Cluster`
definition, is required when creating a cluster with
:class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKECreateClusterOperator`.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gke_create_cluster]
:end-before: [END howto_operator_gke_create_cluster]

.. _howto/operator:GKEDeleteClusterOperator:

Delete GKE cluster
""""""""""""""""""

To delete a cluster, use
:class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEDeleteClusterOperator`.
This would also delete all the nodes allocated to the cluster.

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gke_delete_cluster]
:end-before: [END howto_operator_gke_delete_cluster]

Manage workloads on a GKE cluster
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

GKE works with containerized applications, such as those created on Docker, and deploys them to run on the cluster.
These are called workloads, and when deployed on the cluster they leverage the CPU and memory resources of the cluster
to run effectively.

.. _howto/operator:GKEStartPodOperator:

Run a Pod on a GKE cluster
""""""""""""""""""""""""""

There are two operators available in order to run a pod on a GKE cluster:

* :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator`
* :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator`

``GKEStartPodOperator`` extends ``KubernetesPodOperator`` to provide authorization using Google Cloud credentials.
There is no need to manage the ``kube_config`` file, as it will be generated automatically.
All Kubernetes parameters (except ``config_file``) are also valid for the ``GKEStartPodOperator``.
For more information on ``KubernetesPodOperator``, please look at: :ref:`howto/operator:KubernetesPodOperator` guide.

We can enable the usage of :ref:`XCom <concepts:xcom>` on the operator. This works by launching a sidecar container
with the pod specified. The sidecar is automatically mounted when the XCom usage is specified and it's mount point
is the path ``/airflow/xcom``. To provide values to the XCom, ensure your Pod writes it into a file called
``return.json`` in the sidecar. The contents of this can then be used downstream in your DAG.
Here is an example of it being used:

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gke_start_pod_xcom]
:end-before: [END howto_operator_gke_start_pod_xcom]

And then use it in other operators:

.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gke_xcom_result]
:end-before: [END howto_operator_gke_xcom_result]

Reference
^^^^^^^^^

For further information, look at:

* `GKE API Documentation <https://cloud.google.com/kubernetes-engine/docs/reference/rest>`__
* `Product Documentation <https://cloud.google.com/kubernetes-engine/docs/>`__
* `Kubernetes Documentation <https://kubernetes.io/docs/home/>`__
6 changes: 6 additions & 0 deletions docs/howto/operator/kubernetes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
KubernetesPodOperator
=====================

.. note::
If you use `Google Kubernetes Engine <https://cloud.google.com/kubernetes-engine/>`__, consider
using the
:ref:`GKEStartPodOperator <howto/operator:GKEStartPodOperator>` operator as it
simplifies the Kubernetes authorization process.

The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator`:

* Launches a Docker image as a Kubernetes Pod to execute an individual Airflow
Expand Down
2 changes: 1 addition & 1 deletion docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ These integrations allow you to perform various operations within the Google Clo
-

* - `Kubernetes Engine <https://cloud.google.com/kubernetes_engine/>`__
-
- :doc:`How to use <howto/operator/gcp/kubernetes_engine>`
- :mod:`airflow.providers.google.cloud.hooks.kubernetes_engine`
- :mod:`airflow.providers.google.cloud.operators.kubernetes_engine`
-
Expand Down
1 change: 0 additions & 1 deletion tests/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
'datastore',
'dlp',
'gcs_to_bigquery',
'kubernetes_engine',
'mlengine',
'mssql_to_gcs',
'mysql_to_gcs',
Expand Down

0 comments on commit f56811d

Please sign in to comment.