|
60 | 60 | DataprocSubmitSparkJobOperator,
|
61 | 61 | DataprocSubmitSparkSqlJobOperator,
|
62 | 62 | DataprocUpdateClusterOperator,
|
| 63 | + InstanceFlexibilityPolicy, |
| 64 | + InstanceSelection, |
63 | 65 | )
|
64 | 66 | from airflow.providers.google.cloud.triggers.dataproc import (
|
65 | 67 | DataprocBatchTrigger,
|
|
112 | 114 | "disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256},
|
113 | 115 | "image_uri": "https://www.googleapis.com/compute/beta/projects/"
|
114 | 116 | "custom_image_project_id/global/images/custom_image",
|
| 117 | + "min_num_instances": 1, |
115 | 118 | },
|
116 | 119 | "secondary_worker_config": {
|
117 | 120 | "num_instances": 4,
|
|
132 | 135 | {"executable_file": "init_actions_uris", "execution_timeout": {"seconds": 600}}
|
133 | 136 | ],
|
134 | 137 | "endpoint_config": {},
|
| 138 | + "auxiliary_node_groups": [ |
| 139 | + { |
| 140 | + "node_group": { |
| 141 | + "roles": ["DRIVER"], |
| 142 | + "node_group_config": { |
| 143 | + "num_instances": 2, |
| 144 | + }, |
| 145 | + }, |
| 146 | + "node_group_id": "cluster_driver_pool", |
| 147 | + } |
| 148 | + ], |
135 | 149 | }
|
136 | 150 | VIRTUAL_CLUSTER_CONFIG = {
|
137 | 151 | "kubernetes_cluster_config": {
|
|
197 | 211 | },
|
198 | 212 | }
|
199 | 213 |
|
| 214 | +CONFIG_WITH_FLEX_MIG = { |
| 215 | + "gce_cluster_config": { |
| 216 | + "zone_uri": "https://www.googleapis.com/compute/v1/projects/project_id/zones/zone", |
| 217 | + "metadata": {"metadata": "data"}, |
| 218 | + "network_uri": "network_uri", |
| 219 | + "subnetwork_uri": "subnetwork_uri", |
| 220 | + "internal_ip_only": True, |
| 221 | + "tags": ["tags"], |
| 222 | + "service_account": "service_account", |
| 223 | + "service_account_scopes": ["service_account_scopes"], |
| 224 | + }, |
| 225 | + "master_config": { |
| 226 | + "num_instances": 2, |
| 227 | + "machine_type_uri": "projects/project_id/zones/zone/machineTypes/master_machine_type", |
| 228 | + "disk_config": {"boot_disk_type": "master_disk_type", "boot_disk_size_gb": 128}, |
| 229 | + "image_uri": "https://www.googleapis.com/compute/beta/projects/" |
| 230 | + "custom_image_project_id/global/images/custom_image", |
| 231 | + }, |
| 232 | + "worker_config": { |
| 233 | + "num_instances": 2, |
| 234 | + "machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type", |
| 235 | + "disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256}, |
| 236 | + "image_uri": "https://www.googleapis.com/compute/beta/projects/" |
| 237 | + "custom_image_project_id/global/images/custom_image", |
| 238 | + }, |
| 239 | + "secondary_worker_config": { |
| 240 | + "num_instances": 4, |
| 241 | + "machine_type_uri": "projects/project_id/zones/zone/machineTypes/worker_machine_type", |
| 242 | + "disk_config": {"boot_disk_type": "worker_disk_type", "boot_disk_size_gb": 256}, |
| 243 | + "is_preemptible": True, |
| 244 | + "preemptibility": "SPOT", |
| 245 | + "instance_flexibility_policy": { |
| 246 | + "instance_selection_list": [ |
| 247 | + { |
| 248 | + "machine_types": [ |
| 249 | + "projects/project_id/zones/zone/machineTypes/machine1", |
| 250 | + "projects/project_id/zones/zone/machineTypes/machine2", |
| 251 | + ], |
| 252 | + "rank": 0, |
| 253 | + }, |
| 254 | + {"machine_types": ["projects/project_id/zones/zone/machineTypes/machine3"], "rank": 1}, |
| 255 | + ], |
| 256 | + }, |
| 257 | + }, |
| 258 | + "software_config": {"properties": {"properties": "data"}, "optional_components": ["optional_components"]}, |
| 259 | + "lifecycle_config": { |
| 260 | + "idle_delete_ttl": {"seconds": 60}, |
| 261 | + "auto_delete_time": "2019-09-12T00:00:00.000000Z", |
| 262 | + }, |
| 263 | + "encryption_config": {"gce_pd_kms_key_name": "customer_managed_key"}, |
| 264 | + "autoscaling_config": {"policy_uri": "autoscaling_policy"}, |
| 265 | + "config_bucket": "storage_bucket", |
| 266 | + "initialization_actions": [ |
| 267 | + {"executable_file": "init_actions_uris", "execution_timeout": {"seconds": 600}} |
| 268 | + ], |
| 269 | + "endpoint_config": {}, |
| 270 | +} |
| 271 | + |
200 | 272 | LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION}
|
201 | 273 |
|
202 | 274 | LABELS.update({"airflow-version": "v" + airflow_version.replace(".", "-").replace("+", "-")})
|
@@ -361,10 +433,26 @@ def test_nodes_number(self):
|
361 | 433 | )
|
362 | 434 | assert "num_workers == 0 means single" in str(ctx.value)
|
363 | 435 |
|
| 436 | + def test_min_num_workers_less_than_num_workers(self): |
| 437 | + with pytest.raises(ValueError) as ctx: |
| 438 | + ClusterGenerator( |
| 439 | + num_workers=3, min_num_workers=4, project_id=GCP_PROJECT, cluster_name=CLUSTER_NAME |
| 440 | + ) |
| 441 | + assert ( |
| 442 | + "The value of min_num_workers must be less than or equal to num_workers. " |
| 443 | + "Provided 4(min_num_workers) and 3(num_workers)." in str(ctx.value) |
| 444 | + ) |
| 445 | + |
| 446 | + def test_min_num_workers_without_num_workers(self): |
| 447 | + with pytest.raises(ValueError) as ctx: |
| 448 | + ClusterGenerator(min_num_workers=4, project_id=GCP_PROJECT, cluster_name=CLUSTER_NAME) |
| 449 | + assert "Must specify num_workers when min_num_workers are provided." in str(ctx.value) |
| 450 | + |
364 | 451 | def test_build(self):
|
365 | 452 | generator = ClusterGenerator(
|
366 | 453 | project_id="project_id",
|
367 | 454 | num_workers=2,
|
| 455 | + min_num_workers=1, |
368 | 456 | zone="zone",
|
369 | 457 | network_uri="network_uri",
|
370 | 458 | subnetwork_uri="subnetwork_uri",
|
@@ -395,6 +483,8 @@ def test_build(self):
|
395 | 483 | auto_delete_time=datetime(2019, 9, 12),
|
396 | 484 | auto_delete_ttl=250,
|
397 | 485 | customer_managed_key="customer_managed_key",
|
| 486 | + driver_pool_id="cluster_driver_pool", |
| 487 | + driver_pool_size=2, |
398 | 488 | )
|
399 | 489 | cluster = generator.make()
|
400 | 490 | assert CONFIG == cluster
|
@@ -438,6 +528,56 @@ def test_build_with_custom_image_family(self):
|
438 | 528 | cluster = generator.make()
|
439 | 529 | assert CONFIG_WITH_CUSTOM_IMAGE_FAMILY == cluster
|
440 | 530 |
|
| 531 | + def test_build_with_flex_migs(self): |
| 532 | + generator = ClusterGenerator( |
| 533 | + project_id="project_id", |
| 534 | + num_workers=2, |
| 535 | + zone="zone", |
| 536 | + network_uri="network_uri", |
| 537 | + subnetwork_uri="subnetwork_uri", |
| 538 | + internal_ip_only=True, |
| 539 | + tags=["tags"], |
| 540 | + storage_bucket="storage_bucket", |
| 541 | + init_actions_uris=["init_actions_uris"], |
| 542 | + init_action_timeout="10m", |
| 543 | + metadata={"metadata": "data"}, |
| 544 | + custom_image="custom_image", |
| 545 | + custom_image_project_id="custom_image_project_id", |
| 546 | + autoscaling_policy="autoscaling_policy", |
| 547 | + properties={"properties": "data"}, |
| 548 | + optional_components=["optional_components"], |
| 549 | + num_masters=2, |
| 550 | + master_machine_type="master_machine_type", |
| 551 | + master_disk_type="master_disk_type", |
| 552 | + master_disk_size=128, |
| 553 | + worker_machine_type="worker_machine_type", |
| 554 | + worker_disk_type="worker_disk_type", |
| 555 | + worker_disk_size=256, |
| 556 | + num_preemptible_workers=4, |
| 557 | + preemptibility="Spot", |
| 558 | + region="region", |
| 559 | + service_account="service_account", |
| 560 | + service_account_scopes=["service_account_scopes"], |
| 561 | + idle_delete_ttl=60, |
| 562 | + auto_delete_time=datetime(2019, 9, 12), |
| 563 | + auto_delete_ttl=250, |
| 564 | + customer_managed_key="customer_managed_key", |
| 565 | + secondary_worker_instance_flexibility_policy=InstanceFlexibilityPolicy( |
| 566 | + [ |
| 567 | + InstanceSelection( |
| 568 | + [ |
| 569 | + "projects/project_id/zones/zone/machineTypes/machine1", |
| 570 | + "projects/project_id/zones/zone/machineTypes/machine2", |
| 571 | + ], |
| 572 | + 0, |
| 573 | + ), |
| 574 | + InstanceSelection(["projects/project_id/zones/zone/machineTypes/machine3"], 1), |
| 575 | + ] |
| 576 | + ), |
| 577 | + ) |
| 578 | + cluster = generator.make() |
| 579 | + assert CONFIG_WITH_FLEX_MIG == cluster |
| 580 | + |
441 | 581 |
|
442 | 582 | class TestDataprocClusterCreateOperator(DataprocClusterTestBase):
|
443 | 583 | def test_deprecation_warning(self):
|
|
0 commit comments