Skip to content

Commit

Permalink
Clean up default_args usage in docs (#19803)
Browse files Browse the repository at this point in the history
This PR aligns `default_args` usage within docs to updates that have been made to example DAGs across the board. The main types of updates include:
- Removing `start_date` from being declared in `default_args`.
- Removing the pattern of declaring `default_args` separately from the `DAG()` object.
- Updating `default_args` values to more relevant examples.
- Replace `DummyOperator` with another operator to make some other `default_args` updates relevant and applicable.
  • Loading branch information
josh-fell committed Nov 25, 2021
1 parent 4bf85cf commit 744d11b
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 82 deletions.
14 changes: 7 additions & 7 deletions airflow/example_dags/example_subdag_operator.py
Expand Up @@ -27,12 +27,12 @@

DAG_NAME = 'example_subdag_operator'

args = {
'owner': 'airflow',
}

with DAG(
dag_id=DAG_NAME, default_args=args, start_date=days_ago(2), schedule_interval="@once", tags=['example']
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=days_ago(2),
schedule_interval="@once",
tags=['example'],
) as dag:

start = DummyOperator(
Expand All @@ -41,7 +41,7 @@

section_1 = SubDagOperator(
task_id='section-1',
subdag=subdag(DAG_NAME, 'section-1', args),
subdag=subdag(DAG_NAME, 'section-1', dag.default_args),
)

some_other_task = DummyOperator(
Expand All @@ -50,7 +50,7 @@

section_2 = SubDagOperator(
task_id='section-2',
subdag=subdag(DAG_NAME, 'section-2', args),
subdag=subdag(DAG_NAME, 'section-2', dag.default_args),
)

end = DummyOperator(
Expand Down
51 changes: 24 additions & 27 deletions airflow/example_dags/tutorial.py
Expand Up @@ -34,37 +34,34 @@

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
# [END default_args]

# [START instantiate_dag]
with DAG(
'tutorial',
default_args=default_args,
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
},
# [END default_args]
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
Expand Down
14 changes: 5 additions & 9 deletions airflow/example_dags/tutorial_etl_dag.py
Expand Up @@ -37,18 +37,14 @@

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
}
# [END default_args]

# [START instantiate_dag]
with DAG(
'tutorial_etl_dag',
default_args=default_args,
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={'retries': 2},
# [END default_args]
description='ETL DAG tutorial',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
Expand Down
Expand Up @@ -72,7 +72,7 @@
# [END howto_operator_gcf_deploy_body]

# [START howto_operator_gcf_default_args]
default_args = {'owner': 'airflow'}
default_args = {'retries': '3'}
# [END howto_operator_gcf_default_args]

# [START howto_operator_gcf_deploy_variants]
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/best-practices.rst
Expand Up @@ -504,7 +504,7 @@ This is an example test want to verify the structure of a code-generated DAG aga
with DAG(
dag_id=TEST_DAG_ID,
schedule_interval="@daily",
default_args={"start_date": DATA_INTERVAL_START},
start_date=DATA_INTERVAL_START,
) as dag:
MyCustomOperator(
task_id=TEST_TASK_ID,
Expand Down
39 changes: 25 additions & 14 deletions docs/apache-airflow/concepts/dags.rst
Expand Up @@ -195,16 +195,19 @@ Otherwise, you must pass it into each Operator with ``dag=``.
Default Arguments
-----------------

Often, many Operators inside a DAG need the same set of default arguments (such as their ``start_date``). Rather than having to specify this individually for every Operator, you can instead pass ``default_args`` to the DAG when you create it, and it will auto-apply them to any operator tied to it::
Often, many Operators inside a DAG need the same set of default arguments (such as their ``retries``). Rather than having to specify this individually for every Operator, you can instead pass ``default_args`` to the DAG when you create it, and it will auto-apply them to any operator tied to it::

default_args = {
'start_date': datetime(2016, 1, 1),
'owner': 'airflow'
}

with DAG('my_dag', default_args=default_args) as dag:
op = DummyOperator(task_id='dummy')
print(op.owner) # "airflow"

with DAG(
dag_id='my_dag',
start_date=datetime(2016, 1, 1),
schedule_interval='@daily',
catchup=False,
default_args={'retries': 2},
) as dag:
op = BashOperator(task_id='dummy', bash_command='Hello World!')
print(op.retries) # 2


.. _concepts:dag-decorator:
Expand Down Expand Up @@ -464,12 +467,18 @@ Dependency relationships can be applied across all tasks in a TaskGroup with the

TaskGroup also supports ``default_args`` like DAG, it will overwrite the ``default_args`` in DAG level::

with DAG(dag_id='dag1', default_args={'start_date': datetime(2016, 1, 1), 'owner': 'dag'}):
with TaskGroup('group1', default_args={'owner': 'group'}):
with DAG(
dag_id='dag1',
start_date=datetime(2016, 1, 1),
schedule_interval="@daily",
catchup=False,
default_args={'retries': 1},
):
with TaskGroup('group1', default_args={'retries': 3}):
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2', owner='task2')
print(task1.owner) # "group"
print(task2.owner) # "task2"
task2 = BashOperator(task_id='task2', bash_command='echo Hello World!', retries=2)
print(task1.retries) # 3
print(task2.retries) # 2

If you want to see a more advanced use of TaskGroup, you can look at the ``example_task_group.py`` example DAG that comes with Airflow.

Expand Down Expand Up @@ -539,7 +548,9 @@ This is especially useful if your tasks are built dynamically from configuration
### My great DAG
"""
dag = DAG("my_dag", default_args=default_args)
dag = DAG(
"my_dag", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False
)
dag.doc_md = __doc__
t = BashOperator("foo", dag=dag)
Expand Down
16 changes: 5 additions & 11 deletions docs/apache-airflow/dag-run.rst
Expand Up @@ -114,19 +114,13 @@ in the configuration file. When turned off, the scheduler creates a DAG run only
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"tutorial",
default_args=default_args,
default_args={
"depends_on_past": True,
"retries": 1,
"retry_delay": timedelta(minutes=3),
},
start_date=datetime(2015, 12, 1),
description="A simple tutorial DAG",
schedule_interval="@daily",
Expand Down
3 changes: 2 additions & 1 deletion docs/apache-airflow/faq.rst
Expand Up @@ -173,7 +173,8 @@ What's the deal with ``start_date``?

``start_date`` is partly legacy from the pre-DagRun era, but it is still
relevant in many ways. When creating a new DAG, you probably want to set
a global ``start_date`` for your tasks using ``default_args``. The first
a global ``start_date`` for your tasks. This can be done by declaring your
``start_date`` directly in the ``DAG()`` object. The first
DagRun to be created will be based on the ``min(start_date)`` for all your
tasks. From that point on, the scheduler creates new DagRuns based on
your ``schedule_interval`` and the corresponding task instances run as your
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow/lineage.rst
Expand Up @@ -32,11 +32,11 @@ works.
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.lineage import AUTO
from airflow.lineage.entities import File
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
Expand Down
14 changes: 5 additions & 9 deletions docs/apache-airflow/timezone.rst
Expand Up @@ -86,15 +86,13 @@ and ``end_dates`` in your DAG definitions. This is mostly in order to preserve b
case a naive ``start_date`` or ``end_date`` is encountered the default time zone is applied. It is applied
in such a way that it is assumed that the naive date time is already in the default time zone. In other
words if you have a default time zone setting of ``Europe/Amsterdam`` and create a naive datetime ``start_date`` of
``datetime(2017,1,1)`` it is assumed to be a ``start_date`` of Jan 1, 2017 Amsterdam time.
``datetime(2017, 1, 1)`` it is assumed to be a ``start_date`` of Jan 1, 2017 Amsterdam time.

.. code-block:: python
default_args = dict(start_date=datetime(2016, 1, 1), owner="airflow")
dag = DAG("my_dag", default_args=default_args)
op = DummyOperator(task_id="dummy", dag=dag)
print(op.owner) # Airflow
dag = DAG("my_dag", start_date=datetime(2017, 1, 1), default_args={"retries": 3})
op = BashOperator(task_id="dummy", bash_command="Hello World!", dag=dag)
print(op.retries) # 3
Unfortunately, during DST transitions, some datetimes don’t exist or are ambiguous.
In such situations, pendulum raises an exception. That’s why you should always create aware
Expand Down Expand Up @@ -134,9 +132,7 @@ using ``pendulum``.
local_tz = pendulum.timezone("Europe/Amsterdam")
default_args = dict(start_date=datetime(2016, 1, 1, tzinfo=local_tz), owner="airflow")
dag = DAG("my_tz_dag", default_args=default_args)
dag = DAG("my_tz_dag", start_date=datetime(2016, 1, 1, tzinfo=local_tz))
op = DummyOperator(task_id="dummy", dag=dag)
print(dag.timezone) # <Timezone [Europe/Amsterdam]>
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/tutorial.rst
Expand Up @@ -77,6 +77,7 @@ of default parameters that we can use when creating tasks.

.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
:dedent: 4
:start-after: [START default_args]
:end-before: [END default_args]

Expand Down

0 comments on commit 744d11b

Please sign in to comment.