Skip to content

Commit

Permalink
Add DataprocCreateWorkflowTemplateOperator (#13338)
Browse files Browse the repository at this point in the history
* Add DataprocCreateWorkflowTemplateOperator

* fixup! Add DataprocCreateWorkflowTemplateOperator

* fixup! fixup! Add DataprocCreateWorkflowTemplateOperator
  • Loading branch information
turbaszek committed Dec 28, 2020
1 parent f7d354d commit 04ec45f
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 3 deletions.
32 changes: 31 additions & 1 deletion airflow/providers/google/cloud/example_dags/example_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocCreateWorkflowTemplateOperator,
DataprocDeleteClusterOperator,
DataprocInstantiateWorkflowTemplateOperator,
DataprocSubmitJobOperator,
DataprocUpdateClusterOperator,
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-project")
CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-cluster")
REGION = os.environ.get("GCP_LOCATION", "europe-west1")
ZONE = os.environ.get("GCP_REGION", "europe-west1-b")
BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests")
Expand Down Expand Up @@ -136,6 +138,18 @@
},
}
# [END how_to_cloud_dataproc_hadoop_config]
WORKFLOW_NAME = "airflow-dataproc-test"
WORKFLOW_TEMPLATE = {
"id": WORKFLOW_NAME,
"placement": {
"managed_cluster": {
"cluster_name": CLUSTER_NAME,
"config": CLUSTER_CONFIG,
}
},
"jobs": [{"step_id": "pig_job_1", "pig_job": PIG_JOB["pig_job"]}],
}


with models.DAG("example_gcp_dataproc", start_date=days_ago(1), schedule_interval=None) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]
Expand All @@ -160,6 +174,21 @@
)
# [END how_to_cloud_dataproc_update_cluster_operator]

# [START how_to_cloud_dataproc_create_workflow_template]
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
task_id="create_workflow_template",
template=WORKFLOW_TEMPLATE,
project_id=PROJECT_ID,
location=REGION,
)
# [END how_to_cloud_dataproc_create_workflow_template]

# [START how_to_cloud_dataproc_trigger_workflow_template]
trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)
# [END how_to_cloud_dataproc_trigger_workflow_template]

pig_task = DataprocSubmitJobOperator(
task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID
)
Expand Down Expand Up @@ -210,6 +239,7 @@
# [END how_to_cloud_dataproc_delete_cluster_operator]

create_cluster >> scale_cluster
scale_cluster >> create_workflow_template >> trigger_workflow >> delete_cluster
scale_cluster >> hive_task >> delete_cluster
scale_cluster >> pig_task >> delete_cluster
scale_cluster >> spark_sql_task >> delete_cluster
Expand Down
67 changes: 66 additions & 1 deletion airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,70 @@ def execute(self, context):
super().execute(context)


class DataprocCreateWorkflowTemplateOperator(BaseOperator):
"""
Creates new workflow template.
:param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
:type project_id: str
:param location: Required. The Cloud Dataproc region in which to handle the request.
:type location: str
:param template: The Dataproc workflow template to create. If a dict is provided,
it must be of the same form as the protobuf message WorkflowTemplate.
:type template: Union[dict, WorkflowTemplate]
:param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
retried.
:type retry: google.api_core.retry.Retry
:param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
``retry`` is specified, the timeout applies to each individual attempt.
:type timeout: float
:param metadata: Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
"""

template_fields = ("location", "template")
template_fields_renderers = {"template": "json"}

def __init__(
self,
*,
location: str,
template: Dict,
project_id: str,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
):
super().__init__(**kwargs)
self.location = location
self.template = template
self.project_id = project_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context):
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
self.log.info("Creating template")
try:
workflow = hook.create_workflow_template(
location=self.location,
template=self.template,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
self.log.info("Workflow %s created", workflow.name)
except AlreadyExists:
self.log.info("Workflow with given id already exists")


class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
"""
Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait
Expand Down Expand Up @@ -1596,7 +1660,8 @@ class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
:type impersonation_chain: Union[str, Sequence[str]]
"""

template_fields = ['template_id', 'impersonation_chain']
template_fields = ['template_id', 'impersonation_chain', 'request_id', 'parameters']
template_fields_renderers = {"parameters": "json"}

@apply_defaults
def __init__( # pylint: disable=too-many-arguments
Expand Down
24 changes: 24 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,30 @@ Example of the configuration for a SparkR:
:start-after: [START how_to_cloud_dataproc_sparkr_config]
:end-before: [END how_to_cloud_dataproc_sparkr_config]

Working with workflows templates
--------------------------------

Dataproc supports creating workflow templates that can be triggered later on.

A workflow template can be created using:
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateWorkflowTemplateOperator`.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_dataproc_create_workflow_template]
:end-before: [END how_to_cloud_dataproc_create_workflow_template]

Once a workflow is created users can trigger it using
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator`:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_dataproc_trigger_workflow_template]
:end-before: [END how_to_cloud_dataproc_trigger_workflow_template]


References
^^^^^^^^^^
For further information, take a look at:
Expand Down
1 change: 0 additions & 1 deletion tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHadoopJobOperator',
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator',
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator',
# Base operator. Ignore it
Expand Down
39 changes: 39 additions & 0 deletions tests/providers/google/cloud/operators/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from airflow.providers.google.cloud.operators.dataproc import (
ClusterGenerator,
DataprocCreateClusterOperator,
DataprocCreateWorkflowTemplateOperator,
DataprocDeleteClusterOperator,
DataprocInstantiateInlineWorkflowTemplateOperator,
DataprocInstantiateWorkflowTemplateOperator,
Expand Down Expand Up @@ -115,6 +116,18 @@
METADATA = [("key", "value")]
REQUEST_ID = "request_id_uuid"

WORKFLOW_NAME = "airflow-dataproc-test"
WORKFLOW_TEMPLATE = {
"id": WORKFLOW_NAME,
"placement": {
"managed_cluster": {
"cluster_name": CLUSTER_NAME,
"config": CLUSTER,
}
},
"jobs": [{"step_id": "pig_job_1", "pig_job": {}}],
}


def assert_warning(msg: str, warning: Any):
assert any(msg in str(w) for w in warning.warnings)
Expand Down Expand Up @@ -914,3 +927,29 @@ def test_execute(self, mock_hook, mock_uuid):
)
job = op.generate_job()
self.assertDictEqual(self.job, job)


class TestDataprocCreateWorkflowTemplateOperator:
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute(self, mock_hook):
op = DataprocCreateWorkflowTemplateOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
location=GCP_LOCATION,
project_id=GCP_PROJECT,
retry=RETRY,
timeout=TIMEOUT,
metadata=METADATA,
template=WORKFLOW_TEMPLATE,
)
op.execute(context={})
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
mock_hook.return_value.create_workflow_template.assert_called_once_with(
location=GCP_LOCATION,
project_id=GCP_PROJECT,
retry=RETRY,
timeout=TIMEOUT,
metadata=METADATA,
template=WORKFLOW_TEMPLATE,
)

0 comments on commit 04ec45f

Please sign in to comment.