Skip to content

Commit

Permalink
Add Dataprep operators (#10304)
Browse files Browse the repository at this point in the history
Add DataprepGetJobGroupOperator and DataprepRunJobGroupOperator
for Dataprep service.

Co-authored-by: Tomek Urbaszek <[email protected]>
  • Loading branch information
michalslowikowski00 and turbaszek committed Sep 1, 2020
1 parent f40ac9b commit 804548d
Show file tree
Hide file tree
Showing 8 changed files with 469 additions and 67 deletions.
2 changes: 1 addition & 1 deletion airflow/models/connection.py
Expand Up @@ -51,7 +51,7 @@
),
"cassandra": ("airflow.providers.apache.cassandra.hooks.cassandra.CassandraHook", "cassandra_conn_id"),
"cloudant": ("airflow.providers.cloudant.hooks.cloudant.CloudantHook", "cloudant_conn_id"),
"dataprep": ("airflow.providers.google.cloud.hooks.dataprep.GoogleDataprepHook", "dataprep_conn_id"),
"dataprep": ("airflow.providers.google.cloud.hooks.dataprep.GoogleDataprepHook", "dataprep_default"),
"docker": ("airflow.providers.docker.hooks.docker.DockerHook", "docker_conn_id"),
"elasticsearch": (
"airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook",
Expand Down
45 changes: 41 additions & 4 deletions airflow/providers/google/cloud/example_dags/example_dataprep.py
Expand Up @@ -17,19 +17,56 @@
"""
Example Airflow DAG that shows how to use Google Dataprep.
"""
import os

from airflow import models
from airflow.providers.google.cloud.operators.dataprep import DataprepGetJobsForJobGroupOperator
from airflow.providers.google.cloud.operators.dataprep import (
DataprepGetJobGroupOperator,
DataprepGetJobsForJobGroupOperator,
DataprepRunJobGroupOperator,
)
from airflow.utils import dates

JOB_ID = 6269792
DATAPREP_JOB_ID = int(os.environ.get('DATAPREP_JOB_ID', 12345677))
DATAPREP_JOB_RECIPE_ID = int(os.environ.get('DATAPREP_JOB_RECIPE_ID', 12345677))
DATAPREP_BUCKET = os.environ.get("DATAPREP_BUCKET", "gs://afl-sql/[email protected]")

DATA = {
"wrangledDataset": {"id": DATAPREP_JOB_RECIPE_ID},
"overrides": {
"execution": "dataflow",
"profiler": False,
"writesettings": [
{
"path": DATAPREP_BUCKET,
"action": "create",
"format": "csv",
"compression": "none",
"header": False,
"asSingleFile": False,
}
],
},
}


with models.DAG(
"example_dataprep", schedule_interval=None, start_date=dates.days_ago(1) # Override to match your needs
"example_dataprep", schedule_interval=None, start_date=dates.days_ago(1), # Override to match your needs
) as dag:
# [START how_to_dataprep_run_job_group_operator]
run_job_group = DataprepRunJobGroupOperator(task_id="run_job_group", body_request=DATA)
# [END how_to_dataprep_run_job_group_operator]

# [START how_to_dataprep_get_jobs_for_job_group_operator]
get_jobs_for_job_group = DataprepGetJobsForJobGroupOperator(
task_id="get_jobs_for_job_group", job_id=JOB_ID
task_id="get_jobs_for_job_group", job_id=DATAPREP_JOB_ID
)
# [END how_to_dataprep_get_jobs_for_job_group_operator]

# [START how_to_dataprep_get_job_group_operator]
get_job_group = DataprepGetJobGroupOperator(
task_id="get_job_group", job_group_id=DATAPREP_JOB_ID, embed="", include_deleted=False,
)
# [END how_to_dataprep_get_job_group_operator]

run_job_group >> [get_jobs_for_job_group, get_job_group]
76 changes: 59 additions & 17 deletions airflow/providers/google/cloud/hooks/dataprep.py
Expand Up @@ -18,12 +18,14 @@
"""
This module contains Google Dataprep hook.
"""
import json
import os
from typing import Any, Dict

import requests
from requests import HTTPError
from tenacity import retry, stop_after_attempt, wait_exponential

from airflow import AirflowException
from airflow.hooks.base_hook import BaseHook


Expand All @@ -37,10 +39,13 @@ class GoogleDataprepHook(BaseHook):
"""

def __init__(self, dataprep_conn_id: str = "dataprep_conn_id") -> None:
def __init__(self, dataprep_conn_id: str = "dataprep_default") -> None:
super().__init__()
self.dataprep_conn_id = dataprep_conn_id
self._url = "https://api.clouddataprep.com/v4/jobGroups"
conn = self.get_connection(self.dataprep_conn_id)
extra_dejson = conn.extra_dejson
self._token = extra_dejson.get("extra__dataprep__token")
self._base_url = extra_dejson.get("extra__dataprep__base_url", "https://api.clouddataprep.com")

@property
def _headers(self) -> Dict[str, str]:
Expand All @@ -50,26 +55,63 @@ def _headers(self) -> Dict[str, str]:
}
return headers

@property
def _token(self) -> str:
conn = self.get_connection(self.dataprep_conn_id)
token = conn.extra_dejson.get("token")
if token is None:
raise AirflowException(
"Dataprep token is missing or has invalid format. "
"Please make sure that Dataprep token is added to the Airflow Connections."
)
return token

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
"""
Get information about the batch jobs within a Cloud Dataprep job.
:param job_id The ID of the job that will be fetched.
:param job_id: The ID of the job that will be fetched
:type job_id: int
"""
url: str = f"{self._url}/{job_id}/jobs"

endpoint_path = f"v4/jobGroups/{job_id}/jobs"
url: str = os.path.join(self._base_url, endpoint_path)
response = requests.get(url, headers=self._headers)
response.raise_for_status()
self._raise_for_status(response)
return response.json()

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
def get_job_group(self, job_group_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:
"""
Get the specified job group.
A job group is a job that is executed from a specific node in a flow.
:param job_group_id: The ID of the job that will be fetched
:type job_group_id: int
:param embed: Comma-separated list of objects to pull in as part of the response
:type embed: str
:param include_deleted: if set to "true", will include deleted objects
:type include_deleted: bool
"""

params: Dict[str, Any] = {"embed": embed, "includeDeleted": include_deleted}
endpoint_path = f"v4/jobGroups/{job_group_id}"
url: str = os.path.join(self._base_url, endpoint_path)
response = requests.get(url, headers=self._headers, params=params)
self._raise_for_status(response)
return response.json()

@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
def run_job_group(self, body_request: dict) -> Dict[str, Any]:
"""
Creates a ``jobGroup``, which launches the specified job as the authenticated user.
This performs the same action as clicking on the Run Job button in the application.
To get recipe_id please follow the Dataprep API documentation
https://clouddataprep.com/documentation/api#operation/runJobGroup
:param body_request: The identifier for the recipe you would like to run.
:type body_request: dict
"""

endpoint_path = "v4/jobGroups"
url: str = os.path.join(self._base_url, endpoint_path)
response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
self._raise_for_status(response)
return response.json()

def _raise_for_status(self, response: requests.models.Response) -> None:
try:
response.raise_for_status()
except HTTPError:
self.log.error(response.json().get('exception'))
raise
76 changes: 73 additions & 3 deletions airflow/providers/google/cloud/operators/dataprep.py
Expand Up @@ -35,20 +35,90 @@ class DataprepGetJobsForJobGroupOperator(BaseOperator):
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataprepGetJobsForJobGroupOperator`
:param job_id The ID of the job that will be requests
:type job_id: int
"""

template_fields = ("job_id",)

@apply_defaults
def __init__(self, *, job_id: int, **kwargs) -> None:
def __init__(self, *, dataprep_conn_id: str = "dataprep_default", job_id: int, **kwargs) -> None:
super().__init__(**kwargs)
self.dataprep_conn_id = (dataprep_conn_id,)
self.job_id = job_id

def execute(self, context: Dict):
self.log.info("Fetching data for job with id: %d ...", self.job_id)
hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
hook = GoogleDataprepHook(dataprep_conn_id="dataprep_default",)
response = hook.get_jobs_for_job_group(job_id=self.job_id)
return response


class DataprepGetJobGroupOperator(BaseOperator):
"""
Get the specified job group.
A job group is a job that is executed from a specific node in a flow.
API documentation https://clouddataprep.com/documentation/api#section/Overview
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:DataprepGetJobGroupOperator`
:param job_group_id: The ID of the job that will be requests
:type job_group_id: int
:param embed: Comma-separated list of objects to pull in as part of the response
:type embed: string
:param include_deleted: if set to "true", will include deleted objects
:type include_deleted: bool
"""

template_fields = ("job_group_id", "embed")

@apply_defaults
def __init__(
self,
*,
dataprep_conn_id: str = "dataprep_default",
job_group_id: int,
embed: str,
include_deleted: bool,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.dataprep_conn_id: str = dataprep_conn_id
self.job_group_id = job_group_id
self.embed = embed
self.include_deleted = include_deleted

def execute(self, context: Dict):
self.log.info("Fetching data for job with id: %d ...", self.job_group_id)
hook = GoogleDataprepHook(dataprep_conn_id=self.dataprep_conn_id)
response = hook.get_job_group(
job_group_id=self.job_group_id, embed=self.embed, include_deleted=self.include_deleted,
)
return response


class DataprepRunJobGroupOperator(BaseOperator):
"""
Create a ``jobGroup``, which launches the specified job as the authenticated user.
This performs the same action as clicking on the Run Job button in the application.
To get recipe_id please follow the Dataprep API documentation
https://clouddataprep.com/documentation/api#operation/runJobGroup
:param recipe_id: The identifier for the recipe you would like to run.
:type recipe_id: int
"""

template_fields = ("body_request",)

def __init__(self, *, dataprep_conn_id: str = "dataprep_default", body_request: dict, **kwargs) -> None:
super().__init__(**kwargs)
self.body_request = body_request
self.dataprep_conn_id = dataprep_conn_id

def execute(self, context: None):
self.log.info("Creating a job...")
hook = GoogleDataprepHook(dataprep_conn_id=self.dataprep_conn_id)
response = hook.run_job_group(body_request=self.body_request)
return response
76 changes: 62 additions & 14 deletions docs/howto/operator/google/cloud/dataprep.rst
Expand Up @@ -17,7 +17,30 @@
Google Dataprep Operators
=========================
`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
For more information about the service visit `Google Dataprep API documentation <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`_

Before you begin
^^^^^^^^^^^^^^^^
Before using Dataprep within Airflow you need to authenticate your account with TOKEN.
To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep `instructions <https://clouddataprep.com/documentation/api#section/Authentication>`_ to do it.

TOKEN should be added to the Connection in Airflow in JSON format.
You can check `how to do such connection <https://airflow.readthedocs.io/en/stable/howto/connection/index.html#editing-a-connection-with-the-ui>`_.

The DataprepRunJobGroupOperator will run specified job. Operator required a recipe id. To identify the recipe id please use `API documentation for runJobGroup <https://clouddataprep.com/documentation/api#operation/runJobGroup>`_
E.g. if the URL is /flows/10?recipe=7, the recipe id is 7. The recipe cannot be created via this operator. It can be created only via UI which is available `here <https://clouddataprep.com/>`_.
Some of parameters can be override by DAG's body request. How to do it is shown in example dag.

See following example:
Set values for these fields:
.. code-block::
Conn Id: "your_conn_id"
Extra: {"extra__dataprep__token": "TOKEN",
"extra__dataprep__base_url": "https://api.clouddataprep.com"}
.. contents::
:depth: 1
Expand All @@ -28,33 +51,58 @@ Prerequisite Tasks

.. include:: /howto/operator/google/_partials/prerequisite_tasks.rst

.. _howto/operator:DataprepRunJobGroupOperator:

Run Job Group
^^^^^^^^^^^^^

Operator task is to create a job group, which launches the specified job as the authenticated user.
This performs the same action as clicking on the Run Job button in the application.

To get information about jobs within a Cloud Dataprep job use:
:class:`~airflow.providers.google.cloud.operators.dataprep.DataprepRunJobGroupOperator`

Example usage:

.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_dataprep.py
:language: python
:dedent: 4
:start-after: [START how_to_dataprep_run_job_group_operator]
:end-before: [END how_to_dataprep_run_job_group_operator]

.. _howto/operator:DataprepGetJobsForJobGroupOperator:

Get Jobs For Job Group
^^^^^^^^^^^^^^^^^^^^^^

Operator task is to get information about the batch jobs within a Cloud Dataprep job.

To get information about jobs within a Cloud Dataprep job use:
:class:`~airflow.providers.google.cloud.operators.dataprep.DataprepGetJobsForJobGroupOperator`

To get connection Dataprep with Airflow you need Dataprep token.
Please follow Dataprep instructions.
https://clouddataprep.com/documentation/api#section/Authentication
Example usage:

.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_dataprep.py
:language: python
:dedent: 4
:start-after: [START how_to_dataprep_get_jobs_for_job_group_operator]
:end-before: [END how_to_dataprep_get_jobs_for_job_group_operator]

.. _howto/operator:DataprepGetJobGroupOperator:

It should be added to the Connection in Airflow in JSON format.
Her you can check how to do such connection:
https://airflow.readthedocs.io/en/stable/howto/connection/index.html#editing-a-connection-with-the-ui
Get Job Group
^^^^^^^^^^^^^

See following example:
Set values for these fields:
.. code-block::
Operator task is to get the specified job group.
A job group is a job that is executed from a specific node in a flow.

Conn Id: "your_conn_id"
Extra: "{\"token\": \"TOKEN\"}
To get information about jobs within a Cloud Dataprep job use:
:class:`~airflow.providers.google.cloud.operators.dataprep.DataprepGetJobGroupOperator`

Example usage:

.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_dataprep.py
:language: python
:dedent: 4
:start-after: [START how_to_dataprep_get_jobs_for_job_group_operator]
:end-before: [END how_to_dataprep_get_jobs_for_job_group_operator]
:start-after: [START how_to_dataprep_get_job_group_operator]
:end-before: [END how_to_dataprep_get_job_group_operator]

0 comments on commit 804548d

Please sign in to comment.