Skip to content

Commit

Permalink
Clean-up of google cloud example dags (#19436)
Browse files Browse the repository at this point in the history
- Use start start_date
- Use catchup=False
- Tidy up the chaining of tasks in some cases
  • Loading branch information
o-nikolas committed Nov 7, 2021
1 parent 3a7e687 commit 9efb989
Show file tree
Hide file tree
Showing 24 changed files with 118 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG that uses Google AutoML services.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
Expand All @@ -30,7 +31,6 @@
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
Expand Down Expand Up @@ -60,7 +60,8 @@
with models.DAG(
"example_automl_text_cls",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as example_dag:
create_dataset_task = AutoMLCreateDatasetOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG that uses Google AutoML services.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
Expand All @@ -30,7 +31,6 @@
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
Expand Down Expand Up @@ -59,7 +59,8 @@
with models.DAG(
"example_automl_text",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG that uses Google AutoML services.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
Expand All @@ -30,7 +31,6 @@
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
Expand Down Expand Up @@ -60,7 +60,8 @@
with models.DAG(
"example_automl_text_sentiment",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"""
import os
from copy import deepcopy
from datetime import datetime
from typing import Dict, List

from airflow import models
Expand All @@ -40,7 +41,8 @@
AutoMLTablesUpdateDatasetOperator,
AutoMLTrainModelOperator,
)
from airflow.utils.dates import days_ago

START_DATE = datetime(2021, 1, 1)

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
Expand Down Expand Up @@ -85,7 +87,8 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
with models.DAG(
"example_create_and_deploy",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
user_defined_macros={
"get_target_column_spec": get_target_column_spec,
"target": TARGET,
Expand Down Expand Up @@ -197,7 +200,8 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
with models.DAG(
"example_automl_dataset",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
) as example_dag:
create_dataset_task = AutoMLCreateDatasetOperator(
Expand Down Expand Up @@ -265,7 +269,8 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
with models.DAG(
"example_gcp_get_deploy",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
tags=["example"],
) as get_deploy_dag:
# [START howto_operator_get_model]
Expand All @@ -290,7 +295,8 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
with models.DAG(
"example_gcp_predict",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
tags=["example"],
) as predict_dag:
# [START howto_operator_prediction]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG that uses Google AutoML services.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
Expand All @@ -30,7 +31,6 @@
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
Expand Down Expand Up @@ -66,7 +66,8 @@
with models.DAG(
"example_automl_translation",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG that uses Google AutoML services.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
Expand All @@ -30,7 +31,6 @@
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
Expand Down Expand Up @@ -63,7 +63,8 @@
with models.DAG(
"example_automl_video",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG that uses Google AutoML services.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
Expand All @@ -30,7 +31,6 @@
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
Expand Down Expand Up @@ -64,7 +64,8 @@
with models.DAG(
"example_automl_video_tracking",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG that uses Google AutoML services.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
Expand All @@ -30,7 +31,6 @@
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
Expand Down Expand Up @@ -61,7 +61,8 @@
with models.DAG(
"example_automl_vision",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG that uses Google AutoML services.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.hooks.automl import CloudAutoMLHook
Expand All @@ -30,7 +31,6 @@
AutoMLImportDataOperator,
AutoMLTrainModelOperator,
)
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
GCP_AUTOML_LOCATION = os.environ.get("GCP_AUTOML_LOCATION", "us-central1")
Expand Down Expand Up @@ -63,7 +63,8 @@
with models.DAG(
"example_automl_vision_detection",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
user_defined_macros={"extract_object_id": extract_object_id},
tags=['example'],
) as example_dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"""
import os
import time
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.bigquery_dts import (
Expand All @@ -29,7 +30,6 @@
BigQueryDeleteDataTransferConfigOperator,
)
from airflow.providers.google.cloud.sensors.bigquery_dts import BigQueryDataTransferServiceTransferRunSensor
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
BUCKET_URI = os.environ.get("GCP_DTS_BUCKET_URI", "gs://INVALID BUCKET NAME/bank-marketing.csv")
Expand Down Expand Up @@ -65,7 +65,8 @@
with models.DAG(
"example_gcp_bigquery_dts",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# [START howto_bigquery_create_data_transfer]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"""
import os
import time
from datetime import datetime
from urllib.parse import urlparse

from airflow import models
Expand All @@ -38,7 +39,8 @@
BigQueryUpdateTableSchemaOperator,
BigQueryUpsertTableOperator,
)
from airflow.utils.dates import days_ago

START_DATE = datetime(2021, 1, 1)

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
BQ_LOCATION = "europe-north1"
Expand All @@ -58,7 +60,8 @@
with models.DAG(
"example_bigquery_operations",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
tags=["example"],
) as dag:
# [START howto_operator_bigquery_create_table]
Expand Down Expand Up @@ -238,7 +241,8 @@
with models.DAG(
"example_bigquery_operations_location",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=START_DATE,
catchup=False,
tags=["example"],
) as dag_with_location:
create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
BigQueryIntervalCheckOperator,
BigQueryValueCheckOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset")
Expand Down Expand Up @@ -65,7 +64,8 @@
with models.DAG(
dag_id,
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
user_defined_macros={"DATASET": DATASET, "TABLE": TABLE_1},
) as dag_with_locations:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
BigQueryTableExistenceSensor,
BigQueryTablePartitionExistenceSensor,
)
from airflow.utils.dates import days_ago

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_sensors_dataset")
Expand All @@ -55,7 +54,8 @@
with models.DAG(
dag_id,
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME},
default_args={"project_id": PROJECT_ID},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Example Airflow DAG for Google BigQuery service.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
Expand All @@ -28,7 +29,6 @@
BigQueryDeleteDatasetOperator,
)
from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator
from airflow.utils.dates import days_ago

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfer")
Expand All @@ -38,7 +38,8 @@
with models.DAG(
"example_bigquery_to_bigquery",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
copy_selected_data = BigQueryToBigQueryOperator(
Expand Down

0 comments on commit 9efb989

Please sign in to comment.