Skip to content

Commit

Permalink
Add support for creating multiple replicated clusters in Bigtable hoo…
Browse files Browse the repository at this point in the history
…k and operator (#10475)

* Add support for creating multiple Bigtable replicas

* Flake8 fix
  • Loading branch information
derrickqin committed Aug 24, 2020
1 parent 3a53039 commit b0598b5
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 4 deletions.
22 changes: 20 additions & 2 deletions airflow/providers/google/cloud/hooks/bigtable.py
Expand Up @@ -19,6 +19,7 @@
This module contains a Google Cloud Bigtable Hook.
"""
import enum
import warnings
from typing import Dict, List, Optional, Sequence, Union

from google.cloud.bigtable import Client
Expand Down Expand Up @@ -109,6 +110,7 @@ def create_instance(
main_cluster_id: str,
main_cluster_zone: str,
project_id: str,
replica_clusters: Optional[List[Dict[str, str]]] = None,
replica_cluster_id: Optional[str] = None,
replica_cluster_zone: Optional[str] = None,
instance_display_name: Optional[str] = None,
Expand All @@ -132,11 +134,15 @@ def create_instance(
:param project_id: Optional, Google Cloud Platform project ID where the
BigTable exists. If set to None or missing,
the default project_id from the GCP connection is used.
:type replica_clusters: List[Dict[str, str]]
:param replica_clusters: (optional) A list of replica clusters for the new
instance. Each cluster dictionary contains an id and a zone.
Example: [{"id": "replica-1", "zone": "us-west1-a"}]
:type replica_cluster_id: str
:param replica_cluster_id: (optional) The ID for replica cluster for the new
:param replica_cluster_id: (deprecated) The ID for replica cluster for the new
instance.
:type replica_cluster_zone: str
:param replica_cluster_zone: (optional) The zone for replica cluster.
:param replica_cluster_zone: (deprecated) The zone for replica cluster.
:type instance_type: enums.Instance.Type
:param instance_type: (optional) The type of the instance.
:type instance_display_name: str
Expand Down Expand Up @@ -173,12 +179,24 @@ def create_instance(
)
]
if replica_cluster_id and replica_cluster_zone:
warnings.warn(
"The replica_cluster_id and replica_cluster_zone parameter have been deprecated."
"You should pass the replica_clusters parameter.", DeprecationWarning, stacklevel=2)
clusters.append(instance.cluster(
replica_cluster_id,
replica_cluster_zone,
cluster_nodes,
cluster_storage_type
))
if replica_clusters:
for replica_cluster in replica_clusters:
if "id" in replica_cluster and "zone" in replica_cluster:
clusters.append(instance.cluster(
replica_cluster["id"],
replica_cluster["zone"],
cluster_nodes,
cluster_storage_type
))
operation = instance.create(
clusters=clusters
)
Expand Down
12 changes: 10 additions & 2 deletions airflow/providers/google/cloud/operators/bigtable.py
Expand Up @@ -68,10 +68,15 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
:type project_id: str
:param project_id: Optional, the ID of the GCP project. If set to None or missing,
the default project_id from the GCP connection is used.
:type replica_clusters: List[Dict[str, str]]
:param replica_clusters: (optional) A list of replica clusters for the new
instance. Each cluster dictionary contains an id and a zone.
Example: [{"id": "replica-1", "zone": "us-west1-a"}]
:type replica_cluster_id: str
:param replica_cluster_id: (optional) The ID for replica cluster for the new instance.
:param replica_cluster_id: (deprecated) The ID for replica cluster for the new
instance.
:type replica_cluster_zone: str
:param replica_cluster_zone: (optional) The zone for replica cluster.
:param replica_cluster_zone: (deprecated) The zone for replica cluster.
:type instance_type: enum.IntEnum
:param instance_type: (optional) The type of the instance.
:type instance_display_name: str
Expand Down Expand Up @@ -100,6 +105,7 @@ def __init__(self, *, # pylint: disable=too-many-arguments
main_cluster_id: str,
main_cluster_zone: str,
project_id: Optional[str] = None,
replica_clusters: Optional[List[Dict[str, str]]] = None,
replica_cluster_id: Optional[str] = None,
replica_cluster_zone: Optional[str] = None,
instance_display_name: Optional[str] = None,
Expand All @@ -114,6 +120,7 @@ def __init__(self, *, # pylint: disable=too-many-arguments
self.instance_id = instance_id
self.main_cluster_id = main_cluster_id
self.main_cluster_zone = main_cluster_zone
self.replica_clusters = replica_clusters
self.replica_cluster_id = replica_cluster_id
self.replica_cluster_zone = replica_cluster_zone
self.instance_display_name = instance_display_name
Expand Down Expand Up @@ -145,6 +152,7 @@ def execute(self, context):
instance_id=self.instance_id,
main_cluster_id=self.main_cluster_id,
main_cluster_zone=self.main_cluster_zone,
replica_clusters=self.replica_clusters,
replica_cluster_id=self.replica_cluster_id,
replica_cluster_zone=self.replica_cluster_zone,
instance_display_name=self.instance_display_name,
Expand Down
96 changes: 96 additions & 0 deletions tests/providers/google/cloud/hooks/test_bigtable.py
Expand Up @@ -38,6 +38,13 @@
CBT_CLUSTER = 'cluster'
CBT_ZONE = 'zone'
CBT_TABLE = 'table'
CBT_REPLICA_CLUSTER_ID = 'replica-cluster'
CBT_REPLICA_CLUSTER_ZONE = 'us-west1-b'
CBT_REPLICATE_CLUSTERS = [
{'id': 'replica-1', 'zone': 'us-west1-a'},
{'id': 'replica-2', 'zone': 'us-central1-f'},
{'id': 'replica-3', 'zone': 'us-east1-d'},
]


class TestBigtableHookNoDefaultProjectId(unittest.TestCase):
Expand Down Expand Up @@ -289,6 +296,95 @@ def test_create_instance(self, get_client, instance_create, mock_project_id):
instance_create.assert_called_once_with(clusters=mock.ANY)
self.assertEqual(res.instance_id, 'instance')

@mock.patch(
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
new_callable=PropertyMock,
return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
)
@mock.patch('google.cloud.bigtable.instance.Instance.cluster')
@mock.patch('google.cloud.bigtable.instance.Instance.create')
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
def test_create_instance_with_one_replica_cluster(
self, get_client, instance_create, cluster, mock_project_id
):
operation = mock.Mock()
operation.result_return_value = Instance(
instance_id=CBT_INSTANCE, client=get_client
)
instance_create.return_value = operation

res = self.bigtable_hook_default_project_id.create_instance(
instance_id=CBT_INSTANCE,
main_cluster_id=CBT_CLUSTER,
main_cluster_zone=CBT_ZONE,
replica_cluster_id=CBT_REPLICA_CLUSTER_ID,
replica_cluster_zone=CBT_REPLICA_CLUSTER_ZONE,
cluster_nodes=1,
cluster_storage_type=enums.StorageType.SSD,
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
)
cluster.assert_has_calls(
[
unittest.mock.call(
CBT_CLUSTER, CBT_ZONE, 1, enums.StorageType.SSD
),
unittest.mock.call(
CBT_REPLICA_CLUSTER_ID, CBT_REPLICA_CLUSTER_ZONE, 1, enums.StorageType.SSD
),
],
any_order=True
)
get_client.assert_called_once_with(project_id='example-project')
instance_create.assert_called_once_with(clusters=mock.ANY)
self.assertEqual(res.instance_id, 'instance')

@mock.patch(
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
new_callable=PropertyMock,
return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
)
@mock.patch('google.cloud.bigtable.instance.Instance.cluster')
@mock.patch('google.cloud.bigtable.instance.Instance.create')
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
def test_create_instance_with_multiple_replica_clusters(
self, get_client, instance_create, cluster, mock_project_id
):
operation = mock.Mock()
operation.result_return_value = Instance(
instance_id=CBT_INSTANCE, client=get_client
)
instance_create.return_value = operation

res = self.bigtable_hook_default_project_id.create_instance(
instance_id=CBT_INSTANCE,
main_cluster_id=CBT_CLUSTER,
main_cluster_zone=CBT_ZONE,
replica_clusters=CBT_REPLICATE_CLUSTERS,
cluster_nodes=1,
cluster_storage_type=enums.StorageType.SSD,
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
)
cluster.assert_has_calls(
[
unittest.mock.call(
CBT_CLUSTER, CBT_ZONE, 1, enums.StorageType.SSD
),
unittest.mock.call(
'replica-1', 'us-west1-a', 1, enums.StorageType.SSD
),
unittest.mock.call(
'replica-2', 'us-central1-f', 1, enums.StorageType.SSD
),
unittest.mock.call(
'replica-3', 'us-east1-d', 1, enums.StorageType.SSD
),
],
any_order=True
)
get_client.assert_called_once_with(project_id='example-project')
instance_create.assert_called_once_with(clusters=mock.ANY)
self.assertEqual(res.instance_id, 'instance')

@mock.patch(
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
new_callable=PropertyMock,
Expand Down
65 changes: 65 additions & 0 deletions tests/providers/google/cloud/operators/test_bigtable.py
Expand Up @@ -36,6 +36,11 @@
INSTANCE_ID = 'test-instance-id'
CLUSTER_ID = 'test-cluster-id'
CLUSTER_ZONE = 'us-central1-f'
REPLICATE_CLUSTERS = [
{'id': 'replica-1', 'zone': 'us-west1-a'},
{'id': 'replica-2', 'zone': 'us-central1-f'},
{'id': 'replica-3', 'zone': 'us-east1-d'},
]
GCP_CONN_ID = 'test-gcp-conn-id'
NODES = 5
INSTANCE_DISPLAY_NAME = "test instance"
Expand Down Expand Up @@ -131,6 +136,66 @@ def test_different_error_reraised(self, mock_hook):
main_cluster_id=CLUSTER_ID,
main_cluster_zone=CLUSTER_ZONE,
project_id=PROJECT_ID,
replica_clusters=None,
replica_cluster_id=None,
replica_cluster_zone=None,
timeout=None
)

@mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
def test_create_instance_that_doesnt_exists(self, mock_hook):
mock_hook.return_value.get_instance.return_value = None
op = BigtableCreateInstanceOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
main_cluster_id=CLUSTER_ID,
main_cluster_zone=CLUSTER_ZONE,
task_id="id",
gcp_conn_id=GCP_CONN_ID
)
op.execute(None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.create_instance.assert_called_once_with(
cluster_nodes=None,
cluster_storage_type=None,
instance_display_name=None,
instance_id=INSTANCE_ID,
instance_labels=None,
instance_type=None,
main_cluster_id=CLUSTER_ID,
main_cluster_zone=CLUSTER_ZONE,
project_id=PROJECT_ID,
replica_clusters=None,
replica_cluster_id=None,
replica_cluster_zone=None,
timeout=None
)

@mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
def test_create_instance_with_replicas_that_doesnt_exists(self, mock_hook):
mock_hook.return_value.get_instance.return_value = None
op = BigtableCreateInstanceOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
main_cluster_id=CLUSTER_ID,
main_cluster_zone=CLUSTER_ZONE,
replica_clusters=REPLICATE_CLUSTERS,
task_id="id",
gcp_conn_id=GCP_CONN_ID
)
op.execute(None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.create_instance.assert_called_once_with(
cluster_nodes=None,
cluster_storage_type=None,
instance_display_name=None,
instance_id=INSTANCE_ID,
instance_labels=None,
instance_type=None,
main_cluster_id=CLUSTER_ID,
main_cluster_zone=CLUSTER_ZONE,
project_id=PROJECT_ID,
replica_clusters=REPLICATE_CLUSTERS,
replica_cluster_id=None,
replica_cluster_zone=None,
timeout=None
Expand Down

0 comments on commit b0598b5

Please sign in to comment.