Skip to content

Commit

Permalink
migrate system test gcs_to_bigquery into new design (#22753)
Browse files Browse the repository at this point in the history
  • Loading branch information
joppevos committed Apr 13, 2022
1 parent 49e336a commit 9a623e9
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Use the
:class:`~airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator`
to execute a BigQuery load job.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
.. exampleinclude:: /../../tests/system/providers/google/gcs/example_gcs_to_bigquery.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcs_to_bigquery]
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,24 @@
BigQueryDeleteDatasetOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.utils.trigger_rule import TriggerRule

DATASET_NAME = os.environ.get("GCP_DATASET_NAME", 'airflow_test')
TABLE_NAME = os.environ.get("GCP_TABLE_NAME", 'gcs_to_bq_table')
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "gcs_to_bigquery_operator"

DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
TABLE_NAME = "test"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

with models.DAG(
dag_id='example_gcs_to_bigquery_operator',
dag_id=DAG_ID,
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
schedule_interval='@once',
tags=['example'],
tags=['example', "gcs"],
) as dag:
create_test_dataset = BigQueryCreateEmptyDatasetOperator(
task_id='create_airflow_test_dataset', dataset_id=DATASET_NAME
task_id='create_airflow_test_dataset', dataset_id=DATASET_NAME, project_id=PROJECT_ID
)

# [START howto_operator_gcs_to_bigquery]
Expand All @@ -62,6 +67,25 @@
task_id='delete_airflow_test_dataset',
dataset_id=DATASET_NAME,
delete_contents=True,
trigger_rule=TriggerRule.ALL_DONE,
)

(
# TEST SETUP
create_test_dataset
# TEST BODY
>> load_csv
# TEST TEARDOWN
>> delete_test_dataset
)

create_test_dataset >> load_csv >> delete_test_dataset
from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 9a623e9

Please sign in to comment.