Skip to content

Commit

Permalink
Consolidate to one schedule param (#25410)
Browse files Browse the repository at this point in the history
Deprecate params `schedule_interval` and `timetable` in favor of new param `schedule`.

Dev list vote thread: https://lists.apache.org/thread/9mlhhsoxk4dkowxlltxdk7p1owk1ffxm
Discuss thread: https://lists.apache.org/thread/do76d17wmt64nc7nps0ld0x1tgmo944m
  • Loading branch information
dstandish committed Aug 10, 2022
1 parent 5923788 commit 1b412c9
Show file tree
Hide file tree
Showing 356 changed files with 643 additions and 659 deletions.
2 changes: 1 addition & 1 deletion airflow/example_dags/example_bash_operator.py
Expand Up @@ -28,7 +28,7 @@

with DAG(
dag_id='example_bash_operator',
schedule_interval='0 0 * * *',
schedule='0 0 * * *',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_branch_datetime_operator.py
Expand Up @@ -31,7 +31,7 @@
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule_interval="@daily",
schedule="@daily",
)

# [START howto_branch_datetime_operator]
Expand All @@ -57,7 +57,7 @@
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule_interval="@daily",
schedule="@daily",
)
# [START howto_branch_datetime_operator_next_day]
empty_task_12 = EmptyOperator(task_id='date_in_range', dag=dag2)
Expand Down
Expand Up @@ -30,7 +30,7 @@
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule_interval="@daily",
schedule="@daily",
) as dag:
# [START howto_operator_day_of_week_branch]
empty_task_1 = EmptyOperator(task_id='branch_true')
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_branch_labels.py
Expand Up @@ -27,7 +27,7 @@

with DAG(
"example_branch_labels",
schedule_interval="@daily",
schedule="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_branch_operator.py
Expand Up @@ -32,7 +32,7 @@
dag_id='example_branch_operator',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
schedule="@daily",
tags=['example', 'example2'],
) as dag:
run_this_first = EmptyOperator(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_branch_operator_decorator.py
Expand Up @@ -34,7 +34,7 @@
dag_id='example_branch_python_operator_decorator',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
schedule="@daily",
tags=['example', 'example2'],
) as dag:
run_this_first = EmptyOperator(task_id='run_this_first')
Expand Down
Expand Up @@ -47,7 +47,7 @@ def should_run(**kwargs):

with DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
schedule='*/1 * * * *',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
default_args={'depends_on_past': True},
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_complex.py
Expand Up @@ -27,7 +27,7 @@

with models.DAG(
dag_id="example_complex",
schedule_interval=None,
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example', 'example2', 'example3'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_dag_decorator.py
Expand Up @@ -39,7 +39,7 @@ def execute(self, context: Context):

# [START dag_decorator_usage]
@dag(
schedule_interval=None,
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
Expand Down
12 changes: 6 additions & 6 deletions airflow/example_dags/example_datasets.py
Expand Up @@ -50,7 +50,7 @@
dag_id='example_dataset_dag1',
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval='@daily',
schedule='@daily',
tags=['upstream'],
) as dag1:
# [START task_outlet]
Expand All @@ -61,7 +61,7 @@
dag_id='example_dataset_dag2',
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval=None,
schedule=None,
tags=['upstream'],
) as dag2:
BashOperator(outlets=[dag2_dataset], task_id='upstream_task_2', bash_command="sleep 5")
Expand All @@ -71,7 +71,7 @@
dag_id='example_dataset_dag3_req_dag1',
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_on=[dag1_dataset],
schedule=[dag1_dataset],
tags=['downstream'],
) as dag3:
# [END dag_dep]
Expand All @@ -85,7 +85,7 @@
dag_id='example_dataset_dag4_req_dag1_dag2',
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_on=[dag1_dataset, dag2_dataset],
schedule=[dag1_dataset, dag2_dataset],
tags=['downstream'],
) as dag4:
BashOperator(
Expand All @@ -98,7 +98,7 @@
dag_id='example_dataset_dag5_req_dag1_D',
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_on=[
schedule=[
dag1_dataset,
Dataset('s3://this-dataset-doesnt-get-triggered'),
],
Expand All @@ -114,7 +114,7 @@
dag_id='example_dataset_dag6_req_DD',
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_on=[
schedule=[
Dataset('s3://unrelated/dataset3.txt'),
Dataset('s3://unrelated/dataset_other_unknown.txt'),
],
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_external_task_marker_dag.py
Expand Up @@ -51,7 +51,7 @@
dag_id="example_external_task_marker_parent",
start_date=start_date,
catchup=False,
schedule_interval=None,
schedule=None,
tags=['example2'],
) as parent_dag:
# [START howto_operator_external_task_marker]
Expand All @@ -65,7 +65,7 @@
with DAG(
dag_id="example_external_task_marker_child",
start_date=start_date,
schedule_interval=None,
schedule=None,
catchup=False,
tags=['example2'],
) as child_dag:
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_kubernetes_executor.py
Expand Up @@ -46,7 +46,7 @@
if k8s:
with DAG(
dag_id='example_kubernetes_executor',
schedule_interval=None,
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example3'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_latest_only.py
Expand Up @@ -26,7 +26,7 @@

with DAG(
dag_id='latest_only',
schedule_interval=dt.timedelta(hours=4),
schedule=dt.timedelta(hours=4),
start_date=dt.datetime(2021, 1, 1),
catchup=False,
tags=['example2', 'example3'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_latest_only_with_trigger.py
Expand Up @@ -31,7 +31,7 @@

with DAG(
dag_id='latest_only_with_trigger',
schedule_interval=datetime.timedelta(hours=4),
schedule=datetime.timedelta(hours=4),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example3'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_local_kubernetes_executor.py
Expand Up @@ -41,7 +41,7 @@
if k8s:
with DAG(
dag_id='example_local_kubernetes_executor',
schedule_interval=None,
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example3'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_nested_branch_dag.py
Expand Up @@ -32,7 +32,7 @@
dag_id="example_nested_branch_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
schedule="@daily",
tags=["example"],
) as dag:

Expand Down
Expand Up @@ -59,7 +59,7 @@ def print_env_vars(test_mode=None):

with DAG(
"example_passing_params_via_test_command",
schedule_interval='*/1 * * * *',
schedule='*/1 * * * *',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=4),
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_python_operator.py
Expand Up @@ -34,7 +34,7 @@

with DAG(
dag_id='example_python_operator',
schedule_interval=None,
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_sla_dag.py
Expand Up @@ -40,7 +40,7 @@ def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):


@dag(
schedule_interval="*/2 * * * *",
schedule="*/2 * * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
sla_miss_callback=sla_callback,
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_subdag_operator.py
Expand Up @@ -32,7 +32,7 @@
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@once",
schedule="@once",
tags=['example'],
) as dag:

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_time_delta_sensor_async.py
Expand Up @@ -31,7 +31,7 @@

with DAG(
dag_id="example_time_delta_sensor_async",
schedule_interval=None,
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_trigger_controller_dag.py
Expand Up @@ -30,7 +30,7 @@
dag_id="example_trigger_controller_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@once",
schedule="@once",
tags=['example'],
) as dag:
trigger = TriggerDagRunOperator(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_trigger_target_dag.py
Expand Up @@ -42,7 +42,7 @@ def run_this_func(dag_run=None):
dag_id="example_trigger_target_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval=None,
schedule=None,
tags=['example'],
) as dag:
run_this = run_this_func()
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_xcom.py
Expand Up @@ -63,7 +63,7 @@ def pull_value_from_bash_push(ti=None):

with DAG(
'example_xcom',
schedule_interval="@once",
schedule="@once",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_xcomargs.py
Expand Up @@ -44,7 +44,7 @@ def print_value(value, ts=None):
dag_id='example_xcom_args',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval=None,
schedule=None,
tags=['example'],
) as dag:
print_value(generate_value())
Expand All @@ -53,7 +53,7 @@ def print_value(value, ts=None):
"example_xcom_args_with_operators",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval=None,
schedule=None,
tags=['example'],
) as dag2:
bash_op1 = BashOperator(task_id="c", bash_command="echo c")
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/subdags/subdag.py
Expand Up @@ -40,7 +40,7 @@ def subdag(parent_dag_name, child_dag_name, args):
default_args=args,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
schedule="@daily",
)

for i in range(5):
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial.py
Expand Up @@ -63,7 +63,7 @@
},
# [END default_args]
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_etl_dag.py
Expand Up @@ -45,7 +45,7 @@
default_args={'retries': 2},
# [END default_args]
description='ETL DAG tutorial',
schedule_interval=None,
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_taskflow_api_etl.py
Expand Up @@ -30,7 +30,7 @@

# [START instantiate_dag]
@dag(
schedule_interval=None,
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
Expand Down
Expand Up @@ -31,7 +31,7 @@
)
else:

@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
@dag(schedule=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
def tutorial_taskflow_api_etl_virtualenv():
"""
### TaskFlow API example using virtualenv
Expand Down

0 comments on commit 1b412c9

Please sign in to comment.