Skip to content

Commit

Permalink
Add Google Cloud Tasks how-to documentation (#20145)
Browse files Browse the repository at this point in the history
  • Loading branch information
kazanzhy committed Dec 11, 2021
1 parent e926275 commit 22341b9
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 0 deletions.
26 changes: 26 additions & 0 deletions airflow/providers/google/cloud/example_dags/example_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
) as dag:

# Queue operations
# [START create_queue]
create_queue = CloudTasksQueueCreateOperator(
location=LOCATION,
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
Expand All @@ -81,31 +82,41 @@
timeout=5,
task_id="create_queue",
)
# [END create_queue]

# [START delete_queue]
delete_queue = CloudTasksQueueDeleteOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_id="delete_queue",
)
# [END delete_queue]

# [START resume_queue]
resume_queue = CloudTasksQueueResumeOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_id="resume_queue",
)
# [END resume_queue]

# [START pause_queue]
pause_queue = CloudTasksQueuePauseOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_id="pause_queue",
)
# [END pause_queue]

# [START purge_queue]
purge_queue = CloudTasksQueuePurgeOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_id="purge_queue",
)
# [END purge_queue]

# [START get_queue]
get_queue = CloudTasksQueueGetOperator(
location=LOCATION,
queue_name=QUEUE_ID,
Expand All @@ -116,18 +127,23 @@
task_id="get_queue_result",
bash_command=f"echo {get_queue.output}",
)
# [END get_queue]

get_queue >> get_queue_result

# [START update_queue]
update_queue = CloudTasksQueueUpdateOperator(
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
location=LOCATION,
queue_name=QUEUE_ID,
update_mask={"paths": ["stackdriver_logging_config.sampling_ratio"]},
task_id="update_queue",
)
# [END update_queue]

# [START list_queue]
list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")
# [END list_queue]

chain(
create_queue,
Expand All @@ -141,6 +157,7 @@
)

# Tasks operations
# [START create_task]
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID,
Expand All @@ -150,25 +167,34 @@
timeout=5,
task_id="create_task_to_run",
)
# [END create_task]

# [START tasks_get]
tasks_get = CloudTasksTaskGetOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_name=TASK_NAME,
task_id="tasks_get",
)
# [END tasks_get]

# [START run_task]
run_task = CloudTasksTaskRunOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_name=TASK_NAME,
task_id="run_task",
)
# [END run_task]

# [START list_tasks]
list_tasks = CloudTasksTasksListOperator(location=LOCATION, queue_name=QUEUE_ID, task_id="list_tasks")
# [END list_tasks]

# [START delete_task]
delete_task = CloudTasksTaskDeleteOperator(
location=LOCATION, queue_name=QUEUE_ID, task_name=TASK_NAME, task_id="delete_task"
)
# [END delete_task]

chain(purge_queue, create_task, tasks_get, list_tasks, run_task, delete_task, delete_queue)
52 changes: 52 additions & 0 deletions airflow/providers/google/cloud/operators/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class CloudTasksQueueCreateOperator(BaseOperator):
"""
Creates a queue in Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksQueueCreateOperator`
:param location: The location name in which the queue will be created.
:type location: str
:param task_queue: The task queue to create.
Expand Down Expand Up @@ -140,6 +144,10 @@ class CloudTasksQueueUpdateOperator(BaseOperator):
"""
Updates a queue in Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksQueueUpdateOperator`
:param task_queue: The task queue to update.
This method creates the queue if it does not exist and updates the queue if
it does exist. The queue's name must be specified.
Expand Down Expand Up @@ -240,6 +248,10 @@ class CloudTasksQueueGetOperator(BaseOperator):
"""
Gets a queue from Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksQueueGetOperator`
:param location: The location name in which the queue was created.
:type location: str
:param queue_name: The queue's name.
Expand Down Expand Up @@ -322,6 +334,10 @@ class CloudTasksQueuesListOperator(BaseOperator):
"""
Lists queues from Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksQueuesListOperator`
:param location: The location name in which the queues were created.
:type location: str
:param project_id: (Optional) The ID of the Google Cloud project that owns the Cloud Tasks.
Expand Down Expand Up @@ -409,6 +425,10 @@ class CloudTasksQueueDeleteOperator(BaseOperator):
"""
Deletes a queue from Cloud Tasks, even if it has tasks in it.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksQueueDeleteOperator`
:param location: The location name in which the queue will be deleted.
:type location: str
:param queue_name: The queue's name.
Expand Down Expand Up @@ -488,6 +508,10 @@ class CloudTasksQueuePurgeOperator(BaseOperator):
"""
Purges a queue by deleting all of its tasks from Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksQueuePurgeOperator`
:param location: The location name in which the queue will be purged.
:type location: str
:param queue_name: The queue's name.
Expand Down Expand Up @@ -570,6 +594,10 @@ class CloudTasksQueuePauseOperator(BaseOperator):
"""
Pauses a queue in Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksQueuePauseOperator`
:param location: The location name in which the queue will be paused.
:type location: str
:param queue_name: The queue's name.
Expand Down Expand Up @@ -652,6 +680,10 @@ class CloudTasksQueueResumeOperator(BaseOperator):
"""
Resumes a queue in Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksQueueResumeOperator`
:param location: The location name in which the queue will be resumed.
:type location: str
:param queue_name: The queue's name.
Expand Down Expand Up @@ -734,6 +766,10 @@ class CloudTasksTaskCreateOperator(BaseOperator):
"""
Creates a task in Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksTaskCreateOperator`
:param location: The location name in which the task will be created.
:type location: str
:param queue_name: The queue's name.
Expand Down Expand Up @@ -836,6 +872,10 @@ class CloudTasksTaskGetOperator(BaseOperator):
"""
Gets a task from Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksTaskGetOperator`
:param location: The location name in which the task was created.
:type location: str
:param queue_name: The queue's name.
Expand Down Expand Up @@ -930,6 +970,10 @@ class CloudTasksTasksListOperator(BaseOperator):
"""
Lists the tasks in Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksTasksListOperator`
:param location: The location name in which the tasks were created.
:type location: str
:param queue_name: The queue's name.
Expand Down Expand Up @@ -1024,6 +1068,10 @@ class CloudTasksTaskDeleteOperator(BaseOperator):
"""
Deletes a task from Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksTaskDeleteOperator`
:param location: The location name in which the task will be deleted.
:type location: str
:param queue_name: The queue's name.
Expand Down Expand Up @@ -1109,6 +1157,10 @@ class CloudTasksTaskRunOperator(BaseOperator):
"""
Forces to run a task in Cloud Tasks.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudTasksTaskRunOperator`
:param location: The location name in which the task was created.
:type location: str
:param queue_name: The queue's name.
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ integrations:
tags: [gcp]
- integration-name: Google Cloud Tasks
external-doc-url: https://cloud.google.com/tasks/
how-to-guide:
- /docs/apache-airflow-providers-google/operators/cloud/tasks.rst
logo: /integration-logos/gcp/Cloud-Tasks.png
tags: [gcp]
- integration-name: Google Cloud Text-to-Speech
Expand Down

0 comments on commit 22341b9

Please sign in to comment.