Skip to content

Commit

Permalink
[AIRFLOW-7034] Remove feature: Assigning Dag to task using Bitshift Op (
Browse files Browse the repository at this point in the history
#7685)

* [AIRFLOW-7034] Remove feature: Assigning Dag to task using Bitshift Operator

* fixup! [AIRFLOW-7034] Remove feature: Assigning Dag to task using Bitshift Operator

* fixup! fixup! [AIRFLOW-7034] Remove feature: Assigning Dag to task using Bitshift Operator

* fixup! fixup! fixup! [AIRFLOW-7034] Remove feature: Assigning Dag to task using Bitshift Operator
  • Loading branch information
kaxil committed Mar 12, 2020
1 parent 6140356 commit 137896f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 76 deletions.
18 changes: 18 additions & 0 deletions UPDATING.md
Expand Up @@ -61,6 +61,24 @@ https://developers.google.com/style/inclusive-documentation
-->

### Assigning task to a DAG using bitwise shift (bit-shift) operators are no longer supported

Previously, you could assign a task to a DAG as follows:

```python
dag = DAG('my_dag')
dummy = DummyOperator(task_id='dummy')

dag >> dummy
```

This is no longer supported. Instead, we recommend using the DAG as context manager:

```python
with DAG('my_dag):
dummy = DummyOperator(task_id='dummy')
```

### Deprecating ignore_first_depends_on_past on backfill command and default it to True

When doing backfill with `depends_on_past` dags, users will need to pass `ignore_first_depends_on_past`.
Expand Down
26 changes: 4 additions & 22 deletions airflow/models/baseoperator.py
Expand Up @@ -471,46 +471,28 @@ def __hash__(self):
def __rshift__(self, other):
"""
Implements Self >> Other == self.set_downstream(other)
If "Other" is a DAG, the DAG is assigned to the Operator.
"""
from airflow.models.dag import DAG
if isinstance(other, DAG):
# if this dag is already assigned, do nothing
# otherwise, do normal dag assignment
if not (self.has_dag() and self.dag is other):
self.dag = other
else:
self.set_downstream(other)
self.set_downstream(other)
return other

def __lshift__(self, other):
"""
Implements Self << Other == self.set_upstream(other)
If "Other" is a DAG, the DAG is assigned to the Operator.
"""
from airflow.models.dag import DAG
if isinstance(other, DAG):
# if this dag is already assigned, do nothing
# otherwise, do normal dag assignment
if not (self.has_dag() and self.dag is other):
self.dag = other
else:
self.set_upstream(other)
self.set_upstream(other)
return other

def __rrshift__(self, other):
"""
Called for [DAG] >> [Operator] because DAGs don't have
Called for Operator >> [Operator] because list don't have
__rshift__ operators.
"""
self.__lshift__(other)
return self

def __rlshift__(self, other):
"""
Called for [DAG] << [Operator] because DAGs don't have
Called for Operator << [Operator] because list don't have
__lshift__ operators.
"""
self.__rshift__(other)
Expand Down
26 changes: 9 additions & 17 deletions airflow/providers/google/cloud/operators/pubsub.py
Expand Up @@ -41,8 +41,7 @@ class PubSubCreateTopicOperator(BaseOperator):
with DAG('successful DAG') as dag:
(
dag
>> PubSubTopicCreateOperator(project='my-project',
PubSubTopicCreateOperator(project='my-project',
topic='my_new_topic')
>> PubSubTopicCreateOperator(project='my-project',
topic='my_new_topic')
Expand All @@ -52,8 +51,7 @@ class PubSubCreateTopicOperator(BaseOperator):
with DAG('failing DAG') as dag:
(
dag
>> PubSubTopicCreateOperator(project='my-project',
PubSubTopicCreateOperator(project='my-project',
topic='my_new_topic')
>> PubSubTopicCreateOperator(project='my-project',
topic='my_new_topic',
Expand Down Expand Up @@ -182,8 +180,7 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
with DAG('successful DAG') as dag:
(
dag
>> PubSubSubscriptionCreateOperator(
PubSubSubscriptionCreateOperator(
topic_project='my-project', topic='my-topic',
subscription='my-subscription')
>> PubSubSubscriptionCreateOperator(
Expand All @@ -196,8 +193,7 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
with DAG('failing DAG') as dag:
(
dag
>> PubSubSubscriptionCreateOperator(
PubSubSubscriptionCreateOperator(
topic_project='my-project', topic='my-topic',
subscription='my-subscription')
>> PubSubSubscriptionCreateOperator(
Expand All @@ -210,7 +206,7 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
with DAG('DAG') as dag:
(
dag >> PubSubSubscriptionCreateOperator(
PubSubSubscriptionCreateOperator(
topic_project='my-project', topic='my-topic')
)
Expand Down Expand Up @@ -370,17 +366,15 @@ class PubSubDeleteTopicOperator(BaseOperator):
with DAG('successful DAG') as dag:
(
dag
>> PubSubTopicDeleteOperator(project='my-project',
PubSubTopicDeleteOperator(project='my-project',
topic='non_existing_topic')
)
The operator can be configured to fail if the topic does not exist. ::
with DAG('failing DAG') as dag:
(
dag
>> PubSubTopicCreateOperator(project='my-project',
PubSubTopicCreateOperator(project='my-project',
topic='non_existing_topic',
fail_if_not_exists=True)
)
Expand Down Expand Up @@ -482,8 +476,7 @@ class PubSubDeleteSubscriptionOperator(BaseOperator):
with DAG('successful DAG') as dag:
(
dag
>> PubSubSubscriptionDeleteOperator(project='my-project',
PubSubSubscriptionDeleteOperator(project='my-project',
subscription='non-existing')
)
Expand All @@ -493,8 +486,7 @@ class PubSubDeleteSubscriptionOperator(BaseOperator):
with DAG('failing DAG') as dag:
(
dag
>> PubSubSubscriptionDeleteOperator(
PubSubSubscriptionDeleteOperator(
project='my-project', subscription='non-existing',
fail_if_not_exists=True)
)
Expand Down
13 changes: 0 additions & 13 deletions docs/concepts.rst
Expand Up @@ -353,19 +353,6 @@ is equivalent to:
op2.set_downstream(op3)
op3.set_upstream(op4)
For convenience, the bitshift operators can also be used with DAGs. For example:

.. code:: python
dag >> op1 >> op2
is equivalent to:

.. code:: python
op1.dag = dag
op1.set_downstream(op2)
We can put this all together to build a simple pipeline:

.. code:: python
Expand Down
29 changes: 5 additions & 24 deletions tests/models/test_taskinstance.py
Expand Up @@ -219,36 +219,17 @@ def test_infer_dag(self):

def test_bitshift_compose_operators(self):
dag = DAG('dag', start_date=DEFAULT_DATE)
op1 = DummyOperator(task_id='test_op_1', owner='test')
op2 = DummyOperator(task_id='test_op_2', owner='test')
op3 = DummyOperator(task_id='test_op_3', owner='test')
op4 = DummyOperator(task_id='test_op_4', owner='test')
op5 = DummyOperator(task_id='test_op_5', owner='test')

# can't compose operators without dags
with self.assertRaises(AirflowException):
op1 >> op2
with dag:
op1 = DummyOperator(task_id='test_op_1', owner='test')
op2 = DummyOperator(task_id='test_op_2', owner='test')
op3 = DummyOperator(task_id='test_op_3', owner='test')

dag >> op1 >> op2 << op3

# make sure dag assignment carries through
# using __rrshift__
self.assertIs(op1.dag, dag)
self.assertIs(op2.dag, dag)
self.assertIs(op3.dag, dag)
op1 >> op2 << op3

# op2 should be downstream of both
self.assertIn(op2, op1.downstream_list)
self.assertIn(op2, op3.downstream_list)

# test dag assignment with __rlshift__
dag << op4
self.assertIs(op4.dag, dag)

# dag assignment with __rrshift__
dag >> op5
self.assertIs(op5.dag, dag)

@patch.object(DAG, 'concurrency_reached')
def test_requeue_over_dag_concurrency(self, mock_concurrency_reached):
mock_concurrency_reached.return_value = True
Expand Down

0 comments on commit 137896f

Please sign in to comment.