Skip to content

Commit

Permalink
Add Spot Instances support with Dataproc Operators (#31644)
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova committed Jul 26, 2023
1 parent 48fa7b5 commit 4c2ef99
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 5 deletions.
25 changes: 23 additions & 2 deletions airflow/providers/google/cloud/operators/dataproc.py
Expand Up @@ -26,6 +26,7 @@
import uuid
import warnings
from datetime import datetime, timedelta
from enum import Enum
from typing import TYPE_CHECKING, Any, Sequence

from google.api_core import operation # type: ignore
Expand Down Expand Up @@ -64,6 +65,14 @@
from airflow.utils.context import Context


class PreemptibilityType(Enum):
"""Contains possible Type values of Preemptibility applicable for every secondary worker of Cluster."""

PREEMPTIBLE = "PREEMPTIBLE"
SPOT = "SPOT"
PREEMPTIBILITY_UNSPECIFIED = "PREEMPTIBILITY_UNSPECIFIED"


class ClusterGenerator:
"""Create a new Dataproc Cluster.
Expand Down Expand Up @@ -109,7 +118,13 @@ class ClusterGenerator:
Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or
``pd-standard`` (Persistent Disk Hard Disk Drive).
:param worker_disk_size: Disk size for the worker nodes
:param num_preemptible_workers: The # of preemptible worker nodes to spin up
:param num_preemptible_workers: The # of VM instances in the instance group as secondary workers
inside the cluster with Preemptibility enabled by default.
Note, that it is not possible to mix non-preemptible and preemptible secondary workers in
one cluster.
:param preemptibility: The type of Preemptibility applicable for every secondary worker, see
https://cloud.google.com/dataproc/docs/reference/rpc/ \
google.cloud.dataproc.v1#google.cloud.dataproc.v1.InstanceGroupConfig.Preemptibility
:param zone: The zone where the cluster will be located. Set to None to auto-zone. (templated)
:param network_uri: The network uri to be used for machine communication, cannot be
specified with subnetwork_uri
Expand Down Expand Up @@ -164,6 +179,7 @@ def __init__(
worker_disk_type: str = "pd-standard",
worker_disk_size: int = 1024,
num_preemptible_workers: int = 0,
preemptibility: str = PreemptibilityType.PREEMPTIBLE.value,
service_account: str | None = None,
service_account_scopes: list[str] | None = None,
idle_delete_ttl: int | None = None,
Expand All @@ -177,6 +193,7 @@ def __init__(
self.num_masters = num_masters
self.num_workers = num_workers
self.num_preemptible_workers = num_preemptible_workers
self.preemptibility = self._set_preemptibility_type(preemptibility)
self.storage_bucket = storage_bucket
self.init_actions_uris = init_actions_uris
self.init_action_timeout = init_action_timeout
Expand Down Expand Up @@ -220,6 +237,9 @@ def __init__(
if self.single_node and self.num_preemptible_workers > 0:
raise ValueError("Single node cannot have preemptible workers.")

def _set_preemptibility_type(self, preemptibility: str):
return PreemptibilityType(preemptibility.upper())

def _get_init_action_timeout(self) -> dict:
match = re.match(r"^(\d+)([sm])$", self.init_action_timeout)
if match:
Expand Down Expand Up @@ -328,6 +348,7 @@ def _build_cluster_data(self):
"boot_disk_size_gb": self.worker_disk_size,
},
"is_preemptible": True,
"preemptibility": self.preemptibility.value,
}

if self.storage_bucket:
Expand Down Expand Up @@ -2116,7 +2137,7 @@ class DataprocUpdateClusterOperator(GoogleCloudBaseOperator):
allowed timeout is 1 day.
:param request_id: Optional. A unique id used to identify the request. If the server receives two
``UpdateClusterRequest`` requests with the same id, then the second request will be ignored and the
first ``google.longrunning.Operation`` created and stored in the backend is returned.
first ``google.long-running.Operation`` created and stored in the backend is returned.
:param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
retried.
:param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
Expand Down
30 changes: 27 additions & 3 deletions docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
Expand Up @@ -37,8 +37,21 @@ Prerequisite Tasks
Create a Cluster
----------------

Before you create a dataproc cluster you need to define the cluster.
It describes the identifying information, config, and status of a cluster of Compute Engine instances.
When you create a Dataproc cluster, you have the option to choose Compute Engine as the deployment platform.
In this configuration, Dataproc automatically provisions the required Compute Engine VM instances to run the cluster.
The VM instances are used for the main node, primary worker and secondary worker nodes (if specified).
These VM instances are created and managed by Compute Engine, while Dataproc takes care of configuring the software and
orchestration required for the big data processing tasks.
By providing the configuration for your nodes, you describe the configuration of primary and
secondary nodes, and status of a cluster of Compute Engine instances.
Configuring secondary worker nodes, you can specify the number of workers and their types. By
enabling the Preemptible option to use Preemptible VMs (equivalent to Spot instances) for those nodes, you
can take advantage of the cost savings provided by these instances for your Dataproc workloads.
The primary node, which typically hosts the cluster main and various control services, does not have the Preemptible
option because it's crucial for the primary node to maintain stability and availability.
Once a cluster is created, the configuration settings, including the preemptibility of secondary worker nodes,
cannot be modified directly.

For more information about the available fields to pass when creating a cluster, visit `Dataproc create cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster>`__

A cluster configuration can look as followed:
Expand All @@ -58,7 +71,18 @@ With this configuration we can create the cluster:
:start-after: [START how_to_cloud_dataproc_create_cluster_operator]
:end-before: [END how_to_cloud_dataproc_create_cluster_operator]

For create Dataproc cluster in Google Kubernetes Engine you should use this cluster configuration:
Dataproc on GKE deploys Dataproc virtual clusters on a GKE cluster. Unlike Dataproc on Compute Engine clusters,
Dataproc on GKE virtual clusters do not include separate main and worker VMs. Instead, when you create a Dataproc on
GKE virtual cluster, Dataproc on GKE creates node pools within a GKE cluster. Dataproc on GKE jobs are run as pods on
these node pools. The node pools and scheduling of pods on the node pools are managed by GKE.

When creating a GKE Dataproc cluster, you can specify the usage of Preemptible VMs for the underlying compute resources.
GKE supports the use of Preemptible VMs as a cost-saving measure.
By enabling Preemptible VMs, GKE will provision the cluster nodes using Preemptible VMs. Or you can create nodes as
Spot VM instances, which are the latest update to legacy preemptible VMs.
This can be beneficial for running Dataproc workloads on GKE while optimizing costs.

To create Dataproc cluster in Google Kubernetes Engine you could pass cluster configuration:

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
:language: python
Expand Down
4 changes: 4 additions & 0 deletions docs/spelling_wordlist.txt
Expand Up @@ -769,6 +769,7 @@ InspectContentResponse
InspectTemplate
instafail
installable
InstanceGroupConfig
instanceTemplates
instantiation
integrations
Expand Down Expand Up @@ -1133,6 +1134,9 @@ precheck
Precommit
preconfigured
PredictionServiceClient
Preemptibility
preemptibility
Preemptible
preemptible
prefetch
prefetched
Expand Down
4 changes: 4 additions & 0 deletions tests/providers/google/cloud/operators/test_dataproc.py
Expand Up @@ -114,6 +114,7 @@
"machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type",
"disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
"is_preemptible": True,
"preemptibility": "SPOT",
},
"software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]},
"lifecycle_config": {
Expand Down Expand Up @@ -174,6 +175,7 @@
"machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type",
"disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
"is_preemptible": True,
"preemptibility": "SPOT",
},
"software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]},
"lifecycle_config": {
Expand Down Expand Up @@ -372,6 +374,7 @@ def test_build(self):
worker_disk_type="worker_disk_type",
worker_disk_size=256,
num_preemptible_workers=4,
preemptibility="Spot",
region="region",
service_account="service_account",
service_account_scopes=["service_account_scopes"],
Expand Down Expand Up @@ -409,6 +412,7 @@ def test_build_with_custom_image_family(self):
worker_disk_type="worker_disk_type",
worker_disk_size=256,
num_preemptible_workers=4,
preemptibility="Spot",
region="region",
service_account="service_account",
service_account_scopes=["service_account_scopes"],
Expand Down
Expand Up @@ -52,6 +52,16 @@
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
"secondary_worker_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 1024,
},
"is_preemptible": True,
"preemptibility": "PREEMPTIBLE",
},
}

# Update options
Expand Down
Expand Up @@ -59,6 +59,8 @@
storage_bucket=BUCKET_NAME,
init_actions_uris=[f"gs://{BUCKET_NAME}/{INIT_FILE}"],
metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
num_preemptible_workers=1,
preemptibility="PREEMPTIBLE",
).make()

# [END how_to_cloud_dataproc_create_cluster_generate_cluster_config]
Expand Down
Expand Up @@ -68,6 +68,11 @@
{
"node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp", # noqa
"roles": ["DEFAULT"],
"node_pool_config": {
"config": {
"preemptible": True,
}
},
}
],
},
Expand Down
Expand Up @@ -54,6 +54,16 @@
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
"secondary_worker_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 1024,
},
"is_preemptible": True,
"preemptibility": "PREEMPTIBLE",
},
}

# [END how_to_cloud_dataproc_create_cluster]
Expand Down

0 comments on commit 4c2ef99

Please sign in to comment.