Skip to content

Commit

Permalink
Add possibility to run DAGs from system tests and see DAGs logs (#17868)
Browse files Browse the repository at this point in the history
  • Loading branch information
deedmitrij committed Aug 31, 2021
1 parent 9befaee commit 5007806
Show file tree
Hide file tree
Showing 99 changed files with 264 additions and 89 deletions.
Expand Up @@ -84,7 +84,7 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
# Example DAG to create dataset, train model_id and deploy it.
with models.DAG(
"example_create_and_deploy",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
user_defined_macros={
"get_target_column_spec": get_target_column_spec,
Expand Down Expand Up @@ -196,7 +196,7 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:
# Example DAG for AutoML datasets operations
with models.DAG(
"example_automl_dataset",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
user_defined_macros={"extract_object_id": extract_object_id},
) as example_dag:
Expand Down Expand Up @@ -264,7 +264,7 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:

with models.DAG(
"example_gcp_get_deploy",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as get_deploy_dag:
Expand All @@ -289,7 +289,7 @@ def get_target_column_spec(columns_specs: List[Dict], column_name: str) -> str:

with models.DAG(
"example_gcp_predict",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as predict_dag:
Expand Down
Expand Up @@ -36,7 +36,7 @@
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
schedule_interval=None,
schedule_interval='@once',
start_date=datetime(2018, 11, 1),
tags=['example'],
) as dag:
Expand Down
Expand Up @@ -64,7 +64,7 @@

with models.DAG(
"example_gcp_bigquery_dts",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
Expand Down
Expand Up @@ -57,7 +57,7 @@

with models.DAG(
"example_bigquery_operations",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
Expand Down Expand Up @@ -238,10 +238,10 @@

with models.DAG(
"example_bigquery_operations_location",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
):
) as dag_with_location:
create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset_with_location",
dataset_id=LOCATION_DATASET_NAME,
Expand Down
Expand Up @@ -65,7 +65,7 @@

with models.DAG(
dag_id,
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_1},
Expand Down
Expand Up @@ -54,7 +54,7 @@

with models.DAG(
dag_id,
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME},
Expand Down
Expand Up @@ -37,7 +37,7 @@

with models.DAG(
"example_bigquery_to_bigquery",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
Expand Down
Expand Up @@ -80,7 +80,7 @@

with models.DAG(
'example_gcp_bigtable_operators',
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
Expand Down
Expand Up @@ -77,7 +77,7 @@
with models.DAG(
"example_gcp_cloud_build",
default_args=dict(start_date=dates.days_ago(1)),
schedule_interval=None,
schedule_interval='@once',
tags=['example'],
) as dag:
# [START howto_operator_create_build_from_storage]
Expand Down
Expand Up @@ -79,7 +79,7 @@

with models.DAG(
"gcp_cloud_memorystore_redis",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
) as dag:
Expand Down Expand Up @@ -255,7 +255,7 @@

with models.DAG(
"gcp_cloud_memorystore_memcached",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
) as dag_memcache:
Expand Down
Expand Up @@ -138,7 +138,7 @@

with models.DAG(
'example_gcp_sql',
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
Expand Down
Expand Up @@ -268,7 +268,7 @@ def get_absolute_path(path):

with models.DAG(
dag_id='example_gcp_sql_query',
schedule_interval=None,
schedule_interval='@once',
start_date=days_ago(1),
tags=['example'],
) as dag:
Expand Down
Expand Up @@ -101,7 +101,7 @@

with models.DAG(
"example_gcp_transfer",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
Expand Down
Expand Up @@ -51,7 +51,7 @@

with models.DAG(
'example_gcp_compute',
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
Expand Down
Expand Up @@ -91,7 +91,7 @@

with models.DAG(
'example_gcp_compute_igm',
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag:
Expand Down
Expand Up @@ -31,7 +31,7 @@
with models.DAG(
'example_compute_ssh',
default_args=dict(start_date=dates.days_ago(1)),
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag:
# # [START howto_execute_command_on_remote1]
Expand Down
Expand Up @@ -58,7 +58,7 @@
FIELD_NAME_2 = "second"
FIELD_NAME_3 = "first-rename"

with models.DAG("example_gcp_datacatalog", start_date=days_ago(1), schedule_interval=None) as dag:
with models.DAG("example_gcp_datacatalog", schedule_interval='@once', start_date=days_ago(1)) as dag:
# Create
# [START howto_operator_gcp_datacatalog_create_entry_group]
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
Expand Down
Expand Up @@ -62,7 +62,7 @@

with models.DAG(
"example_gcp_dataflow_native_java",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag_native_java:
Expand Down Expand Up @@ -111,7 +111,7 @@
"example_gcp_dataflow_native_python",
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_native_python:

Expand Down Expand Up @@ -146,7 +146,7 @@
"example_gcp_dataflow_native_python_async",
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_native_python_async:
# [START howto_operator_start_python_job_async]
Expand Down Expand Up @@ -239,7 +239,7 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
"example_gcp_dataflow_template",
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_template:
# [START howto_operator_start_template_job]
Expand Down
Expand Up @@ -46,7 +46,7 @@
with models.DAG(
dag_id="example_gcp_dataflow_flex_template_java",
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
) as dag_flex_template:
# [START howto_operator_start_template_job]
start_flex_template = DataflowStartFlexTemplateOperator(
Expand Down
Expand Up @@ -37,7 +37,7 @@
with models.DAG(
dag_id="example_gcp_dataflow_sql",
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_sql:
# [START howto_operator_start_sql_job]
Expand Down
Expand Up @@ -147,7 +147,7 @@

with models.DAG(
"example_data_fusion",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=dates.days_ago(1),
) as dag:
# [START howto_cloud_data_fusion_create_instance_operator]
Expand Down
Expand Up @@ -52,7 +52,7 @@

with models.DAG(
"example_dataprep",
schedule_interval=None,
schedule_interval='@once',
start_date=dates.days_ago(1), # Override to match your needs
) as dag:
# [START how_to_dataprep_run_job_group_operator]
Expand Down
Expand Up @@ -151,7 +151,7 @@
}


with models.DAG("example_gcp_dataproc", start_date=days_ago(1), schedule_interval=None) as dag:
with models.DAG("example_gcp_dataproc", schedule_interval='@once', start_date=days_ago(1)) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
Expand Down
Expand Up @@ -42,7 +42,7 @@

with models.DAG(
"example_gcp_datastore",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=dates.days_ago(1),
tags=["example"],
) as dag:
Expand Down Expand Up @@ -83,7 +83,7 @@
with models.DAG(
"example_gcp_datastore_operations",
start_date=dates.days_ago(1),
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
tags=["example"],
) as dag2:
# [START how_to_allocate_ids]
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/google/cloud/example_dags/example_dlp.py
Expand Up @@ -61,7 +61,7 @@

with models.DAG(
"example_gcp_dlp",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=['example'],
) as dag1:
Expand Down Expand Up @@ -110,7 +110,7 @@

with models.DAG(
"example_gcp_dlp_info_types",
schedule_interval=None,
schedule_interval='@once',
start_date=days_ago(1),
tags=["example", "dlp", "info-types"],
) as dag2:
Expand Down Expand Up @@ -152,7 +152,7 @@
TRIGGER_ID = "example_trigger"

with models.DAG(
"example_gcp_dlp_job", schedule_interval=None, start_date=days_ago(1), tags=["example", "dlp_job"]
"example_gcp_dlp_job", schedule_interval='@once', start_date=days_ago(1), tags=["example", "dlp_job"]
) as dag3: # [START howto_operator_dlp_create_job_trigger]
create_trigger = CloudDLPCreateJobTriggerOperator(
project_id=GCP_PROJECT,
Expand Down Expand Up @@ -195,7 +195,7 @@

with models.DAG(
"example_gcp_dlp_deidentify_content",
schedule_interval=None,
schedule_interval='@once',
start_date=days_ago(1),
tags=["example", "dlp", "deidentify"],
) as dag4:
Expand Down
Expand Up @@ -56,7 +56,7 @@

with models.DAG(
"example_facebook_ads_to_gcs",
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
) as dag:

Expand Down
Expand Up @@ -96,7 +96,7 @@
with models.DAG(
'example_gcp_function',
default_args=default_args,
schedule_interval=None, # Override to match your needs
schedule_interval='@once', # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
) as dag:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/example_dags/example_gcs.py
Expand Up @@ -60,7 +60,7 @@
with models.DAG(
"example_gcs",
start_date=days_ago(1),
schedule_interval=None,
schedule_interval='@once',
tags=['example'],
) as dag:
create_bucket1 = GCSCreateBucketOperator(
Expand Down Expand Up @@ -160,7 +160,7 @@
with models.DAG(
"example_gcs_sensors",
start_date=days_ago(1),
schedule_interval=None,
schedule_interval='@once',
tags=['example'],
) as dag2:
create_bucket = GCSCreateBucketOperator(
Expand Down
Expand Up @@ -41,7 +41,7 @@
with models.DAG(
"example_gcs_timespan_file_transform",
start_date=days_ago(1),
schedule_interval=None,
schedule_interval='@once',
tags=['example'],
) as dag:

Expand Down
Expand Up @@ -36,7 +36,7 @@
dag = models.DAG(
dag_id='example_gcs_to_bigquery_operator',
start_date=days_ago(2),
schedule_interval=None,
schedule_interval='@once',
tags=['example'],
)

Expand Down
Expand Up @@ -39,7 +39,7 @@
OBJECT_2 = os.environ.get("GCP_GCS_OBJECT_2", "test-gcs-to-gcs-2")

with models.DAG(
"example_gcs_to_gcs", start_date=days_ago(1), schedule_interval=None, tags=['example']
"example_gcs_to_gcs", schedule_interval='@once', start_date=days_ago(1), tags=['example']
) as dag:
# [START howto_synch_bucket]
sync_bucket = GCSSynchronizeBucketsOperator(
Expand Down

0 comments on commit 5007806

Please sign in to comment.