Skip to content

Commit

Permalink
Fix system test for MetastoreHivePartitionSensor (#32861)
Browse files Browse the repository at this point in the history
  • Loading branch information
moiseenkov committed Jul 31, 2023
1 parent c422920 commit ce5eebd
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 14 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/google/provider.yaml
Expand Up @@ -94,7 +94,7 @@ dependencies:
- google-cloud-dataform>=0.5.0
- google-cloud-dataplex>=1.4.2
- google-cloud-dataproc>=5.4.0
- google-cloud-dataproc-metastore>=1.10.0
- google-cloud-dataproc-metastore>=1.12.0
- google-cloud-dlp>=3.12.0
- google-cloud-kms>=2.15.0
- google-cloud-language>=2.9.0
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Expand Up @@ -426,7 +426,7 @@
"google-cloud-dataflow-client>=0.8.2",
"google-cloud-dataform>=0.5.0",
"google-cloud-dataplex>=1.4.2",
"google-cloud-dataproc-metastore>=1.10.0",
"google-cloud-dataproc-metastore>=1.12.0",
"google-cloud-dataproc>=5.4.0",
"google-cloud-dlp>=3.12.0",
"google-cloud-kms>=2.15.0",
Expand Down
Expand Up @@ -19,45 +19,221 @@
Example Airflow DAG that show how to check Hive partitions existence
using Dataproc Metastore Sensor.
Note that Metastore service must be configured to use gRPC endpoints,
Note that Metastore service must be configured to use gRPC endpoints.
"""
from __future__ import annotations

import datetime
import os

from airflow import models
from airflow.decorators import task
from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
DataprocSubmitJobOperator,
)
from airflow.providers.google.cloud.operators.dataproc_metastore import (
DataprocMetastoreCreateServiceOperator,
DataprocMetastoreDeleteServiceOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSDeleteBucketOperator
from airflow.providers.google.cloud.sensors.dataproc_metastore import MetastoreHivePartitionSensor
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

DAG_ID = "dataproc_metastore_hive_partition_sensor"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "hive_partition_sensor"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env")
REGION = "us-central1"
NETWORK = "default"

SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-")
REGION = "europe-west1"
TABLE_NAME = "test_table"
PARTITION_1 = "column1=value1"
PARTITION_2 = "column2=value2/column3=value3"
METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-")
METASTORE_TIMEOUT = 2400
METASTORE_SERVICE = {
"name": METASTORE_SERVICE_ID,
"hive_metastore_config": {
"endpoint_protocol": "GRPC",
},
"network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}",
}
METASTORE_SERVICE_QFN = f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}"
DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-")
DATAPROC_CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-2",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-2",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
"metastore_config": {
"dataproc_metastore_service": METASTORE_SERVICE_QFN,
},
"gce_cluster_config": {
"service_account_scopes": [
"https://www.googleapis.com/auth/cloud-platform",
],
},
}

TABLE_NAME = "transactions_partitioned"
COLUMN = "TransactionType"
PARTITION_1 = f"{COLUMN}=credit".lower()
PARTITION_2 = f"{COLUMN}=debit".lower()
SOURCE_DATA_BUCKET = "airflow-system-tests-resources"
SOURCE_DATA_PATH = "dataproc/hive"
SOURCE_DATA_FILE_NAME = "part-00000.parquet"
EXTERNAL_TABLE_BUCKET = "{{task_instance.xcom_pull(task_ids='get_hive_warehouse_bucket_task', key='bucket')}}"
QUERY_CREATE_EXTERNAL_TABLE = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS transactions
(SubmissionDate DATE, TransactionAmount DOUBLE, TransactionType STRING)
STORED AS PARQUET
LOCATION 'gs://{EXTERNAL_TABLE_BUCKET}/{SOURCE_DATA_PATH}';
"""
QUERY_CREATE_PARTITIONED_TABLE = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {TABLE_NAME}
(SubmissionDate DATE, TransactionAmount DOUBLE)
PARTITIONED BY ({COLUMN} STRING);
"""
QUERY_COPY_DATA_WITH_PARTITIONS = f"""
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE {TABLE_NAME} PARTITION ({COLUMN})
SELECT SubmissionDate,TransactionAmount,TransactionType FROM transactions;
"""

with models.DAG(
DAG_ID,
start_date=datetime.datetime(2021, 1, 1),
schedule="@once",
catchup=False,
tags=["example", "dataproc", "metastore"],
tags=["example", "dataproc", "metastore", "partition", "hive", "sensor"],
) as dag:

create_metastore_service = DataprocMetastoreCreateServiceOperator(
task_id="create_metastore_service",
region=REGION,
project_id=PROJECT_ID,
service=METASTORE_SERVICE,
service_id=METASTORE_SERVICE_ID,
timeout=METASTORE_TIMEOUT,
)

create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
cluster_name=DATAPROC_CLUSTER_NAME,
project_id=PROJECT_ID,
cluster_config=DATAPROC_CLUSTER_CONFIG,
region=REGION,
)

@task(task_id="get_hive_warehouse_bucket_task")
def get_hive_warehouse_bucket(**kwargs):
"""Returns Hive Metastore Warehouse GCS bucket name."""
ti = kwargs["ti"]
metastore_service: dict = ti.xcom_pull(task_ids="create_metastore_service")
config_overrides: dict = metastore_service["hive_metastore_config"]["config_overrides"]
destination_dir: str = config_overrides["hive.metastore.warehouse.dir"]
bucket, _ = _parse_gcs_url(destination_dir)
ti.xcom_push(key="bucket", value=bucket)

get_hive_warehouse_bucket_task = get_hive_warehouse_bucket()

copy_source_data = GCSToGCSOperator(
task_id="copy_source_data",
source_bucket=SOURCE_DATA_BUCKET,
source_object=f"{SOURCE_DATA_PATH}/{SOURCE_DATA_FILE_NAME}",
destination_bucket=EXTERNAL_TABLE_BUCKET,
destination_object=f"{SOURCE_DATA_PATH}/{SOURCE_DATA_FILE_NAME}",
)

create_external_table = DataprocSubmitJobOperator(
task_id="create_external_table",
job={
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
"hive_job": {"query_list": {"queries": [QUERY_CREATE_EXTERNAL_TABLE]}},
},
region=REGION,
project_id=PROJECT_ID,
)

create_partitioned_table = DataprocSubmitJobOperator(
task_id="create_partitioned_table",
job={
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
"hive_job": {"query_list": {"queries": [QUERY_CREATE_PARTITIONED_TABLE]}},
},
region=REGION,
project_id=PROJECT_ID,
)

partition_data = DataprocSubmitJobOperator(
task_id="partition_data",
job={
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
"hive_job": {"query_list": {"queries": [QUERY_COPY_DATA_WITH_PARTITIONS]}},
},
region=REGION,
project_id=PROJECT_ID,
)

# [START how_to_cloud_dataproc_metastore_hive_partition_sensor]
sensor = MetastoreHivePartitionSensor(
hive_partition_sensor = MetastoreHivePartitionSensor(
task_id="hive_partition_sensor",
service_id=SERVICE_ID,
service_id=METASTORE_SERVICE_ID,
region=REGION,
table=TABLE_NAME,
partitions=[PARTITION_1, PARTITION_2],
)
# [END how_to_cloud_dataproc_metastore_hive_partition_sensor]

delete_dataproc_cluster = DataprocDeleteClusterOperator(
task_id="delete_dataproc_cluster",
cluster_name=DATAPROC_CLUSTER_NAME,
project_id=PROJECT_ID,
region=REGION,
trigger_rule=TriggerRule.ALL_DONE,
)

delete_metastore_service = DataprocMetastoreDeleteServiceOperator(
task_id="delete_metastore_service",
service_id=METASTORE_SERVICE_ID,
project_id=PROJECT_ID,
region=REGION,
trigger_rule=TriggerRule.ALL_DONE,
)

delete_warehouse_bucket = GCSDeleteBucketOperator(
task_id="delete_warehouse_bucket",
bucket_name=EXTERNAL_TABLE_BUCKET,
trigger_rule=TriggerRule.ALL_DONE,
)

# TEST SETUP
(
create_metastore_service
>> create_cluster
>> get_hive_warehouse_bucket_task
>> copy_source_data
>> create_external_table
>> create_partitioned_table
>> partition_data
)
(
create_metastore_service
# TEST BODY
>> hive_partition_sensor
# TEST TEARDOWN
>> [delete_dataproc_cluster, delete_metastore_service, delete_warehouse_bucket]
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
Expand Down

0 comments on commit ce5eebd

Please sign in to comment.