Skip to content

Commit

Permalink
Add Dataplex Data Quality operators. (#32256)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Beata Kossakowska <[email protected]>
  • Loading branch information
bkossakowska and Beata Kossakowska committed Aug 13, 2023
1 parent 93c3ccb commit dfb2403
Show file tree
Hide file tree
Showing 9 changed files with 2,454 additions and 18 deletions.
500 changes: 498 additions & 2 deletions airflow/providers/google/cloud/hooks/dataplex.py

Large diffs are not rendered by default.

859 changes: 854 additions & 5 deletions airflow/providers/google/cloud/operators/dataplex.py

Large diffs are not rendered by default.

126 changes: 124 additions & 2 deletions airflow/providers/google/cloud/sensors/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@
"""This module contains Google Dataplex sensors."""
from __future__ import annotations

import time
from typing import TYPE_CHECKING, Sequence

if TYPE_CHECKING:
from airflow.utils.context import Context

from google.api_core.exceptions import GoogleAPICallError
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry
from google.cloud.dataplex_v1.types import DataScanJob

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
from airflow.providers.google.cloud.hooks.dataplex import (
AirflowDataQualityScanException,
AirflowDataQualityScanResultTimeoutException,
DataplexHook,
)
from airflow.sensors.base import BaseSensorOperator


Expand Down Expand Up @@ -114,3 +120,119 @@ def poke(self, context: Context) -> bool:
self.log.info("Current status of the Dataplex task %s => %s", self.dataplex_task_id, task_status)

return task_status == TaskState.ACTIVE


class DataplexDataQualityJobStatusSensor(BaseSensorOperator):
"""
Check the status of the Dataplex DataQuality job.
:param project_id: Required. The ID of the Google Cloud project that the task belongs to.
:param region: Required. The ID of the Google Cloud region that the task belongs to.
:param data_scan_id: Required. Data Quality scan identifier.
:param job_id: Required. Job ID.
:param api_version: The version of the api that will be requested for example 'v3'.
:param retry: A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param metadata: Additional metadata that is provided to the method.
:param gcp_conn_id: The connection ID to use when fetching connection info.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param result_timeout: Value in seconds for which operator will wait for the Data Quality scan result.
Throws exception if there is no result found after specified amount of seconds.
:param fail_on_dq_failure: If set to true and not all Data Quality scan rules have been passed,
an exception is thrown. If set to false and not all Data Quality scan rules have been passed,
execution will finish with success.
:return: Boolean indicating if the job run has reached the ``DataScanJob.State.SUCCEEDED``.
"""

template_fields = ["job_id"]

def __init__(
self,
project_id: str,
region: str,
data_scan_id: str,
job_id: str,
api_version: str = "v1",
retry: Retry | _MethodDefault = DEFAULT,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
fail_on_dq_failure: bool = False,
result_timeout: float = 60.0 * 10,
start_sensor_time: float = time.monotonic(),
*args,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
self.project_id = project_id
self.region = region
self.data_scan_id = data_scan_id
self.job_id = job_id
self.api_version = api_version
self.retry = retry
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.fail_on_dq_failure = fail_on_dq_failure
self.result_timeout = result_timeout
self.start_sensor_time = start_sensor_time

def execute(self, context: Context) -> None:
super().execute(context)

def _duration(self):
return time.monotonic() - self.start_sensor_time

def poke(self, context: Context) -> bool:
self.log.info("Waiting for job %s to be %s", self.job_id, DataScanJob.State.SUCCEEDED)
if self.result_timeout:
duration = self._duration()
if duration > self.result_timeout:
raise AirflowDataQualityScanResultTimeoutException(
f"Timeout: Data Quality scan {self.job_id} is not ready after {self.result_timeout}s"
)

hook = DataplexHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
impersonation_chain=self.impersonation_chain,
)

try:
job = hook.get_data_scan_job(
project_id=self.project_id,
region=self.region,
data_scan_id=self.data_scan_id,
job_id=self.job_id,
timeout=self.timeout,
retry=self.retry,
metadata=self.metadata,
)
except GoogleAPICallError as e:
raise AirflowException(
f"Error occurred when trying to retrieve Data Quality scan job: {self.data_scan_id}", e
)

job_status = job.state
self.log.info(
"Current status of the Dataplex Data Quality scan job %s => %s", self.job_id, job_status
)
if job_status == DataScanJob.State.FAILED:
raise AirflowException(f"Data Quality scan job failed: {self.job_id}")
if job_status == DataScanJob.State.CANCELLED:
raise AirflowException(f"Data Quality scan job cancelled: {self.job_id}")
if self.fail_on_dq_failure:
if job_status == DataScanJob.State.SUCCEEDED and not job.data_quality_result.passed:
raise AirflowDataQualityScanException(
f"Data Quality job {self.job_id} execution failed due to failure of its scanning "
f"rules: {self.data_scan_id}"
)
return job_status == DataScanJob.State.SUCCEEDED
165 changes: 164 additions & 1 deletion docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ With this configuration we can create the lake:
:start-after: [START howto_dataplex_create_lake_operator]
:end-before: [END howto_dataplex_create_lake_operator]


Delete a lake
-------------

Expand All @@ -142,3 +141,167 @@ To delete a lake you can use:
:dedent: 4
:start-after: [START howto_dataplex_delete_lake_operator]
:end-before: [END howto_dataplex_delete_lake_operator]

Create or update a Data Quality scan
------------------------------------

Before you create a Dataplex Data Quality scan you need to define its body.
For more information about the available fields to pass when creating a Data Quality scan, visit `Dataplex create data quality API. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans#DataScan>`__

A simple Data Quality scan configuration can look as followed:

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 0
:start-after: [START howto_dataplex_data_quality_configuration]
:end-before: [END howto_dataplex_data_quality_configuration]

With this configuration we can create or update the Data Quality scan:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateOrUpdateDataQualityScanOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_create_data_quality_operator]
:end-before: [END howto_dataplex_create_data_quality_operator]

Get a Data Quality scan
-----------------------

To get a Data Quality scan you can use:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataQualityScanOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_get_data_quality_operator]
:end-before: [END howto_dataplex_get_data_quality_operator]



Delete a Data Quality scan
--------------------------

To delete a Data Quality scan you can use:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteDataQualityScanOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_delete_data_quality_operator]
:end-before: [END howto_dataplex_delete_data_quality_operator]

Run a Data Quality scan
-----------------------

You can run Dataplex Data Quality scan in asynchronous modes to later check its status using sensor:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexRunDataQualityScanOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_run_data_quality_operator]
:end-before: [END howto_dataplex_run_data_quality_operator]

To check that running Dataplex Data Quality scan succeeded you can use:

:class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexDataQualityJobStatusSensor`.

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_data_scan_job_state_sensor]
:end-before: [END howto_dataplex_data_scan_job_state_sensor]

Get a Data Quality scan job
---------------------------

To get a Data Quality scan job you can use:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataQualityScanResultOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_get_data_quality_job_operator]
:end-before: [END howto_dataplex_get_data_quality_job_operator]

Create a zone
-------------

Before you create a Dataplex zone you need to define its body.

For more information about the available fields to pass when creating a zone, visit `Dataplex create zone API. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.zones#Zone>`__

A simple zone configuration can look as followed:

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 0
:start-after: [START howto_dataplex_zone_configuration]
:end-before: [END howto_dataplex_zone_configuration]

With this configuration we can create a zone:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateZoneOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_create_zone_operator]
:end-before: [END howto_dataplex_create_zone_operator]

Delete a zone
-------------

To delete a zone you can use:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteZoneOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_delete_zone_operator]
:end-before: [END howto_dataplex_delete_zone_operator]

Create a asset
--------------

Before you create a Dataplex asset you need to define its body.

For more information about the available fields to pass when creating a asset, visit `Dataplex create asset API. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.zones.assets#Asset>`__

A simple asset configuration can look as followed:

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 0
:start-after: [START howto_dataplex_asset_configuration]
:end-before: [END howto_dataplex_asset_configuration]

With this configuration we can create the asset:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateAssetOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_create_asset_operator]
:end-before: [END howto_dataplex_create_asset_operator]

Delete a asset
--------------

To delete a asset you can use:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteAssetOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_delete_asset_operator]
:end-before: [END howto_dataplex_delete_asset_operator]
3 changes: 3 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ datapoint
Dataprep
Dataproc
dataproc
DataScan
dataScans
Dataset
dataset
datasetId
Expand Down Expand Up @@ -485,6 +487,7 @@ DOS'ing
DownloadReportV
downscaling
downstreams
dq
Drillbit
Drivy
dropdown
Expand Down

0 comments on commit dfb2403

Please sign in to comment.