Skip to content

Commit

Permalink
Add more operators to example DAGs for Cloud Tasks (#13235)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Dec 22, 2020
1 parent 18df31d commit 9042a58
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 48 deletions.
97 changes: 89 additions & 8 deletions airflow/providers/google/cloud/example_dags/example_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,38 @@
and deletes Queues and creates, gets, lists, runs and deletes Tasks in the Google
Cloud Tasks service in the Google Cloud.
"""


import os
from datetime import datetime, timedelta

from google.api_core.retry import Retry
from google.cloud.tasks_v2.types import Queue
from google.protobuf import timestamp_pb2

from airflow import models
from airflow.operators.bash_operator import BashOperator
from airflow.providers.google.cloud.operators.tasks import (
CloudTasksQueueCreateOperator,
CloudTasksQueueDeleteOperator,
CloudTasksQueueGetOperator,
CloudTasksQueuePauseOperator,
CloudTasksQueuePurgeOperator,
CloudTasksQueueResumeOperator,
CloudTasksQueuesListOperator,
CloudTasksQueueUpdateOperator,
CloudTasksTaskCreateOperator,
CloudTasksTaskDeleteOperator,
CloudTasksTaskGetOperator,
CloudTasksTaskRunOperator,
CloudTasksTasksListOperator,
)
from airflow.utils.dates import days_ago
from airflow.utils.helpers import chain

timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(datetime.now() + timedelta(hours=12)) # pylint: disable=no-member

LOCATION = "europe-west1"
QUEUE_ID = "cloud-tasks-queue"
QUEUE_ID = os.environ.get('GCP_TASKS_QUEUE_ID', "cloud-tasks-queue")
TASK_NAME = "task-to-run"


Expand All @@ -61,16 +72,75 @@
tags=['example'],
) as dag:

# Queue operations
create_queue = CloudTasksQueueCreateOperator(
location=LOCATION,
task_queue=Queue(),
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
queue_name=QUEUE_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_queue",
)

create_task_to_run = CloudTasksTaskCreateOperator(
delete_queue = CloudTasksQueueDeleteOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_id="delete_queue",
)

resume_queue = CloudTasksQueueResumeOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_id="resume_queue",
)

pause_queue = CloudTasksQueuePauseOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_id="pause_queue",
)

purge_queue = CloudTasksQueuePurgeOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_id="purge_queue",
)

get_queue = CloudTasksQueueGetOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_id="get_queue",
)

get_queue_result = BashOperator(
task_id="get_queue_result",
bash_command="echo \"{{ task_instance.xcom_pull('get_queue') }}\"",
)
get_queue >> get_queue_result

update_queue = CloudTasksQueueUpdateOperator(
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
location=LOCATION,
queue_name=QUEUE_ID,
update_mask={"paths": ["stackdriver_logging_config.sampling_ratio"]},
task_id="update_queue",
)

list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")

chain(
create_queue,
update_queue,
pause_queue,
resume_queue,
purge_queue,
get_queue,
list_queue,
delete_queue,
)

# Tasks operations
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task=TASK,
Expand All @@ -80,13 +150,24 @@
task_id="create_task_to_run",
)

tasks_get = CloudTasksTaskGetOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_name=TASK_NAME,
task_id="tasks_get",
)

run_task = CloudTasksTaskRunOperator(
location=LOCATION,
queue_name=QUEUE_ID,
task_name=TASK_NAME,
retry=Retry(maximum=10.0),
timeout=5,
task_id="run_task",
)

create_queue >> create_task_to_run >> run_task
list_tasks = CloudTasksTasksListOperator(location=LOCATION, queue_name=QUEUE_ID, task_id="list_tasks")

delete_task = CloudTasksTaskDeleteOperator(
location=LOCATION, queue_name=QUEUE_ID, task_name=TASK_NAME, task_id="delete_task"
)

chain(purge_queue, create_task, tasks_get, list_tasks, run_task, delete_task, delete_queue)
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/operators/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,15 +646,15 @@ def execute(self, context):
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
queues = hook.pause_queue(
queue = hook.pause_queue(
location=self.location,
queue_name=self.queue_name,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
return [MessageToDict(q) for q in queues]
return MessageToDict(queue)


class CloudTasksQueueResumeOperator(BaseOperator):
Expand Down
10 changes: 0 additions & 10 deletions tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
}

MISSING_EXAMPLES_FOR_OPERATORS = {
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueDeleteOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueResumeOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuePauseOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuePurgeOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksTaskGetOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksTasksListOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksTaskDeleteOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueGetOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueUpdateOperator',
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuesListOperator',
# Deprecated operator. Ignore it.
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
'.CloudDataTransferServiceS3ToGCSOperator',
Expand Down

0 comments on commit 9042a58

Please sign in to comment.