Skip to content

Commit

Permalink
Remove references to deprecated operators/params in PubSub operators (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
josh-fell committed Mar 27, 2022
1 parent ca4b8d1 commit 719135a
Showing 1 changed file with 57 additions and 65 deletions.
122 changes: 57 additions & 65 deletions airflow/providers/google/cloud/operators/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,23 @@ class PubSubCreateTopicOperator(BaseOperator):
with DAG('successful DAG') as dag:
(
PubSubTopicCreateOperator(project='my-project',
topic='my_new_topic')
>> PubSubTopicCreateOperator(project='my-project',
topic='my_new_topic')
PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic')
>> PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic')
)
The operator can be configured to fail if the topic already exists. ::
with DAG('failing DAG') as dag:
(
PubSubTopicCreateOperator(project='my-project',
topic='my_new_topic')
>> PubSubTopicCreateOperator(project='my-project',
topic='my_new_topic',
fail_if_exists=True)
PubSubCreateTopicOperator(project_id='my-project', topic='my_new_topic')
>> PubSubCreateTopicOperator(
project_id='my-project',
topic='my_new_topic',
fail_if_exists=True,
)
)
Both ``project`` and ``topic`` are templated so you can use
variables in them.
Both ``project_id`` and ``topic`` are templated so you can use Jinja templating in their values.
:param project_id: Optional, the Google Cloud project ID where the topic will be created.
If set to None or missing, the default project_id from the Google Cloud connection is used.
Expand Down Expand Up @@ -192,47 +190,53 @@ class PubSubCreateSubscriptionOperator(BaseOperator):
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PubSubCreateSubscriptionOperator`
By default, the subscription will be created in ``topic_project``. If
``subscription_project`` is specified and the Google Cloud credentials allow, the
By default, the subscription will be created in ``project_id``. If
``subscription_project_id`` is specified and the Google Cloud credentials allow, the
Subscription can be created in a different project from its topic.
By default, if the subscription already exists, this operator will
not cause the DAG to fail. However, the topic must exist in the project. ::
with DAG('successful DAG') as dag:
(
PubSubSubscriptionCreateOperator(
topic_project='my-project', topic='my-topic',
subscription='my-subscription')
>> PubSubSubscriptionCreateOperator(
topic_project='my-project', topic='my-topic',
subscription='my-subscription')
PubSubCreateSubscriptionOperator(
project_id='my-project',
topic='my-topic',
subscription='my-subscription'
)
>> PubSubCreateSubscriptionOperator(
project_id='my-project',
topic='my-topic',
subscription='my-subscription',
)
)
The operator can be configured to fail if the subscription already exists.
::
with DAG('failing DAG') as dag:
(
PubSubSubscriptionCreateOperator(
topic_project='my-project', topic='my-topic',
subscription='my-subscription')
>> PubSubSubscriptionCreateOperator(
topic_project='my-project', topic='my-topic',
subscription='my-subscription', fail_if_exists=True)
PubSubCreateSubscriptionOperator(
project_id='my-project',
topic='my-topic',
subscription='my-subscription',
)
>> PubSubCreateSubscriptionOperator(
project_id='my-project',
topic='my-topic',
subscription='my-subscription',
fail_if_exists=True,
)
)
Finally, subscription is not required. If not passed, the operator will
generated a universally unique identifier for the subscription's name. ::
with DAG('DAG') as dag:
(
PubSubSubscriptionCreateOperator(
topic_project='my-project', topic='my-topic')
)
PubSubCreateSubscriptionOperator(project_id='my-project', topic='my-topic')
``topic_project``, ``topic``, ``subscription``, ``subscription_project_id`` and
``impersonation_chain`` are templated so you can use variables in them.
``project_id``, ``topic``, ``subscription``, ``subscription_project_id`` and
``impersonation_chain`` are templated so you can use Jinja templating in their values.
:param project_id: Optional, the Google Cloud project ID where the topic exists.
If set to None or missing, the default project_id from the Google Cloud connection is used.
Expand Down Expand Up @@ -357,8 +361,8 @@ def __init__(
project_id = topic_project
if subscription_project:
warnings.warn(
"The project_id parameter has been deprecated. You should pass "
"the subscription_project parameter.",
"The subscription_project parameter has been deprecated. You should pass "
"the subscription_project_id parameter.",
DeprecationWarning,
stacklevel=2,
)
Expand Down Expand Up @@ -431,22 +435,16 @@ class PubSubDeleteTopicOperator(BaseOperator):
not cause the DAG to fail. ::
with DAG('successful DAG') as dag:
(
PubSubTopicDeleteOperator(project='my-project',
topic='non_existing_topic')
)
PubSubDeleteTopicOperator(project_id='my-project', topic='non_existing_topic')
The operator can be configured to fail if the topic does not exist. ::
with DAG('failing DAG') as dag:
(
PubSubTopicCreateOperator(project='my-project',
topic='non_existing_topic',
fail_if_not_exists=True)
PubSubDeleteTopicOperator(
project_id='my-project', topic='non_existing_topic', fail_if_not_exists=True,
)
Both ``project`` and ``topic`` are templated so you can use
variables in them.
Both ``project_id`` and ``topic`` are templated so you can use Jinja templating in their values.
:param project_id: Optional, the Google Cloud project ID in which to work (templated).
If set to None or missing, the default project_id from the Google Cloud connection is used.
Expand Down Expand Up @@ -551,24 +549,18 @@ class PubSubDeleteSubscriptionOperator(BaseOperator):
not cause the DAG to fail. ::
with DAG('successful DAG') as dag:
(
PubSubSubscriptionDeleteOperator(project='my-project',
subscription='non-existing')
)
PubSubDeleteSubscriptionOperator(project_id='my-project', subscription='non-existing')
The operator can be configured to fail if the subscription already exists.
::
with DAG('failing DAG') as dag:
(
PubSubSubscriptionDeleteOperator(
project='my-project', subscription='non-existing',
fail_if_not_exists=True)
PubSubDeleteSubscriptionOperator(
project_id='my-project', subscription='non-existing', fail_if_not_exists=True,
)
``project``, and ``subscription`` are templated so you can use
variables in them.
``project_id``, and ``subscription`` are templated so you can use Jinja templating in their values.
:param project_id: Optional, the Google Cloud project ID in which to work (templated).
If set to None or missing, the default project_id from the Google Cloud connection is used.
Expand Down Expand Up @@ -679,14 +671,16 @@ class PubSubPublishMessageOperator(BaseOperator):
m2 = {'data': b'Knock, knock'}
m3 = {'attributes': {'foo': ''}}
t1 = PubSubPublishOperator(
project='my-project',topic='my_topic',
t1 = PubSubPublishMessageOperator(
project_id='my-project',
topic='my_topic',
messages=[m1, m2, m3],
create_topic=True,
dag=dag)
dag=dag,
)
``project`` , ``topic``, and ``messages`` are templated so you can use
variables in them.
``project_id``, ``topic``, and ``messages`` are templated so you can use Jinja templating
in their values.
:param project_id: Optional, the Google Cloud project ID in which to work (templated).
If set to None or missing, the default project_id from the Google Cloud connection is used.
Expand Down Expand Up @@ -775,22 +769,20 @@ class PubSubPullOperator(BaseOperator):
instead.
.. seealso::
For more information on how to use this operator, take a look at the guide:
For more information on how to use this operator and the PubSubPullSensor, take a look at the guide:
:ref:`howto/operator:PubSubPullSensor`
This sensor operator will pull up to ``max_messages`` messages from the
This operator will pull up to ``max_messages`` messages from the
specified PubSub subscription. When the subscription returns messages,
the poke method's criteria will be fulfilled and the messages will be
returned from the operator and passed through XCom for downstream tasks.
the messages will be returned immediately from the operator and passed through XCom for downstream tasks.
If ``ack_messages`` is set to True, messages will be immediately
acknowledged before being returned, otherwise, downstream tasks will be
responsible for acknowledging them.
``project`` and ``subscription`` are templated so you can use
variables in them.
``project_id `` and ``subscription`` are templated so you can use Jinja templating in their values.
:param project: the Google Cloud project ID for the subscription (templated)
:param project_id: the Google Cloud project ID for the subscription (templated)
:param subscription: the Pub/Sub subscription name. Do not include the
full subscription path.
:param max_messages: The maximum number of messages to retrieve per
Expand Down

0 comments on commit 719135a

Please sign in to comment.