Skip to content

Commit

Permalink
Add Bigtable Update Instance Hook/Operator (#10340)
Browse files Browse the repository at this point in the history
Add Bigtable Update Instance Hook/Operator
  • Loading branch information
ryanyuan committed Aug 16, 2020
1 parent 6656464 commit 382c101
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 3 deletions.
18 changes: 17 additions & 1 deletion airflow/providers/google/cloud/example_dags/example_bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,21 @@
from airflow import models
from airflow.providers.google.cloud.operators.bigtable import (
BigtableCreateInstanceOperator, BigtableCreateTableOperator, BigtableDeleteInstanceOperator,
BigtableDeleteTableOperator, BigtableUpdateClusterOperator,
BigtableDeleteTableOperator, BigtableUpdateClusterOperator, BigtableUpdateInstanceOperator,
)
from airflow.providers.google.cloud.sensors.bigtable import BigtableTableReplicationCompletedSensor
from airflow.utils.dates import days_ago

GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project')
CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id')
CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name')
CBT_INSTANCE_DISPLAY_NAME_UPDATED = getenv(
"CBT_INSTANCE_DISPLAY_NAME_UPDATED", "Human-readable name - updated"
)
CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2')
CBT_INSTANCE_TYPE_PROD = getenv('CBT_INSTANCE_TYPE_PROD', '1')
CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}')
CBT_INSTANCE_LABELS_UPDATED = getenv('CBT_INSTANCE_LABELS', '{"env": "prod"}')
CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id')
CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b')
CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3')
Expand Down Expand Up @@ -103,6 +108,16 @@
create_instance_task >> create_instance_task2
# [END howto_operator_gcp_bigtable_instance_create]

# [START howto_operator_gcp_bigtable_instance_update]
update_instance_task = BigtableUpdateInstanceOperator(
instance_id=CBT_INSTANCE_ID,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
instance_type=int(CBT_INSTANCE_TYPE_PROD),
instance_labels=json.loads(CBT_INSTANCE_LABELS_UPDATED),
task_id='update_instance_task',
)
# [END howto_operator_gcp_bigtable_instance_update]

# [START howto_operator_gcp_bigtable_cluster_update]
cluster_update_task = BigtableUpdateClusterOperator(
project_id=GCP_PROJECT_ID,
Expand Down Expand Up @@ -186,6 +201,7 @@
create_instance_task \
>> create_table_task \
>> cluster_update_task \
>> update_instance_task \
>> delete_table_task
create_instance_task2 \
>> create_table_task2 \
Expand Down
46 changes: 46 additions & 0 deletions airflow/providers/google/cloud/hooks/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""
This module contains a Google Cloud Bigtable Hook.
"""
import enum
from typing import Dict, List, Optional, Sequence, Union

from google.cloud.bigtable import Client
Expand Down Expand Up @@ -184,6 +185,51 @@ def create_instance(
operation.result(timeout)
return instance

@GoogleBaseHook.fallback_to_default_project_id
def update_instance(
self,
instance_id: str,
project_id: str,
instance_display_name: Optional[str] = None,
instance_type: Optional[Union[enums.Instance.Type, enum.IntEnum]] = None,
instance_labels: Optional[Dict] = None,
timeout: Optional[float] = None
) -> Instance:
"""
Update an existing instance.
:type instance_id: str
:param instance_id: The ID for the existing instance.
:type project_id: str
:param project_id: Optional, Google Cloud Platform project ID where the
BigTable exists. If set to None or missing,
the default project_id from the GCP connection is used.
:type instance_display_name: str
:param instance_display_name: (optional) Human-readable name of the instance.
:type instance_type: enums.Instance.Type or enum.IntEnum
:param instance_type: (optional) The type of the instance.
:type instance_labels: dict
:param instance_labels: (optional) Dictionary of labels to associate with the
instance.
:type timeout: int
:param timeout: (optional) timeout (in seconds) for instance update.
If None is not specified, Operator will wait indefinitely.
"""
instance_type = enums.Instance.Type(instance_type)

instance = Instance(
instance_id=instance_id,
client=self._get_client(project_id=project_id),
display_name=instance_display_name,
instance_type=instance_type,
labels=instance_labels,
)

operation = instance.update()
operation.result(timeout)

return instance

@staticmethod
def create_table(
instance: Instance,
Expand Down
79 changes: 78 additions & 1 deletion airflow/providers/google/cloud/operators/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"""
This module contains Google Cloud Bigtable operators.
"""
from typing import Dict, Iterable, List, Optional
import enum
from typing import Dict, Iterable, List, Optional, Union

import google.api_core.exceptions
from google.cloud.bigtable.column_family import GarbageCollectionRule
Expand Down Expand Up @@ -158,6 +159,82 @@ def execute(self, context):
raise e


class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
"""
Updates an existing Cloud Bigtable instance.
For more details about instance creation have a look at the reference:
https://googleapis.dev/python/bigtable/latest/instance.html#google.cloud.bigtable.instance.Instance.update
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BigtableUpdateInstanceOperator`
:type instance_id: str
:param instance_id: The ID of the Cloud Bigtable instance to update.
:type project_id: str
:param project_id: Optional, the ID of the GCP project. If set to None or missing,
the default project_id from the GCP connection is used.
:type instance_display_name: str
:param instance_display_name: (optional) Human-readable name of the instance.
:type instance_type: enums.Instance.Type or enum.IntEnum
:param instance_type: (optional) The type of the instance.
:type instance_labels: dict
:param instance_labels: (optional) Dictionary of labels to associate
with the instance.
:type timeout: int
:param timeout: (optional) timeout (in seconds) for instance update.
If None is not specified, Operator will wait indefinitely.
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""

REQUIRED_ATTRIBUTES: Iterable[str] = ['instance_id']
template_fields: Iterable[str] = ['project_id', 'instance_id']

@apply_defaults
def __init__(self, *,
instance_id: str,
project_id: Optional[str] = None,
instance_display_name: Optional[str] = None,
instance_type: Optional[Union[enums.Instance.Type, enum.IntEnum]] = None,
instance_labels: Optional[Dict] = None,
timeout: Optional[float] = None,
gcp_conn_id: str = 'google_cloud_default',
**kwargs) -> None:
self.project_id = project_id
self.instance_id = instance_id
self.instance_display_name = instance_display_name
self.instance_type = instance_type
self.instance_labels = instance_labels
self.timeout = timeout
self._validate_inputs()
self.gcp_conn_id = gcp_conn_id
super().__init__(**kwargs)

def execute(self, context):
hook = BigtableHook(gcp_conn_id=self.gcp_conn_id)
instance = hook.get_instance(project_id=self.project_id,
instance_id=self.instance_id)
if not instance:
raise AirflowException(
f"Dependency: instance '{self.instance_id}' does not exist."
)

try:
hook.update_instance(
project_id=self.project_id,
instance_id=self.instance_id,
instance_display_name=self.instance_display_name,
instance_type=self.instance_type,
instance_labels=self.instance_labels,
timeout=self.timeout,
)
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error('An error occurred. Exiting.')
raise e


class BigtableDeleteInstanceOperator(BaseOperator, BigtableValidationMixin):
"""
Deletes the Cloud Bigtable instance, including its clusters and all related tables.
Expand Down
23 changes: 23 additions & 0 deletions docs/howto/operator/google/cloud/bigtable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,29 @@ it will be retrieved from the GCP connection used. Both variants are shown:
:start-after: [START howto_operator_gcp_bigtable_instance_create]
:end-before: [END howto_operator_gcp_bigtable_instance_create]

.. _howto/operator:BigtableUpdateInstanceOperator:

BigtableUpdateInstanceOperator
------------------------------

Use the :class:`~airflow.providers.google.cloud.operators.bigtable.BigtableUpdateInstanceOperator`
to update an existing Google Cloud Bigtable instance.

Only the following configuration can be updated for an existing instance:
instance_display_name, instance_type and instance_labels.

Using the operator
""""""""""""""""""

You can create the operator with or without project id. If project id is missing
it will be retrieved from the GCP connection used. Both variants are shown:

.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_bigtable.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_instance_update]
:end-before: [END howto_operator_gcp_bigtable_instance_update]

.. _howto/operator:BigtableDeleteInstanceOperator:

BigtableDeleteInstanceOperator
Expand Down
43 changes: 43 additions & 0 deletions tests/providers/google/cloud/hooks/test_bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import mock
from google.cloud.bigtable import Client
from google.cloud.bigtable.instance import Instance
from google.cloud.bigtable_admin_v2 import enums
from mock import PropertyMock

from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
Expand All @@ -31,6 +32,9 @@
)

CBT_INSTANCE = 'instance'
CBT_INSTANCE_DISPLAY_NAME = "test instance"
CBT_INSTANCE_TYPE = enums.Instance.Type.PRODUCTION
CBT_INSTANCE_LABELS = {"env": "sit"}
CBT_CLUSTER = 'cluster'
CBT_ZONE = 'zone'
CBT_TABLE = 'table'
Expand Down Expand Up @@ -102,6 +106,23 @@ def test_create_instance_overridden_project_id(self, get_client, instance_create
instance_create.assert_called_once_with(clusters=mock.ANY)
self.assertEqual(res.instance_id, 'instance')

@mock.patch('google.cloud.bigtable.instance.Instance.update')
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
def test_update_instance_overridden_project_id(self, get_client, instance_update):
operation = mock.Mock()
operation.result_return_value = Instance(instance_id=CBT_INSTANCE, client=get_client)
instance_update.return_value = operation
res = self.bigtable_hook_no_default_project_id.update_instance(
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
instance_id=CBT_INSTANCE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=CBT_INSTANCE_TYPE,
instance_labels=CBT_INSTANCE_LABELS
)
get_client.assert_called_once_with(project_id='example-project')
instance_update.assert_called_once_with()
self.assertEqual(res.instance_id, 'instance')

@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
def test_delete_table_overridden_project_id(self, get_client):
instance_method = get_client.return_value.instance
Expand Down Expand Up @@ -268,6 +289,28 @@ def test_create_instance(self, get_client, instance_create, mock_project_id):
instance_create.assert_called_once_with(clusters=mock.ANY)
self.assertEqual(res.instance_id, 'instance')

@mock.patch(
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
new_callable=PropertyMock,
return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST
)
@mock.patch('google.cloud.bigtable.instance.Instance.update')
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
def test_update_instance(self, get_client, instance_update, mock_project_id):
operation = mock.Mock()
operation.result_return_value = Instance(instance_id=CBT_INSTANCE, client=get_client)
instance_update.return_value = operation
res = self.bigtable_hook_default_project_id.update_instance(
instance_id=CBT_INSTANCE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=CBT_INSTANCE_TYPE,
instance_labels=CBT_INSTANCE_LABELS,
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
)
get_client.assert_called_once_with(project_id='example-project')
instance_update.assert_called_once_with()
self.assertEqual(res.instance_id, 'instance')

@mock.patch('google.cloud.bigtable.instance.Instance.create')
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
def test_create_instance_overridden_project_id(self, get_client, instance_create):
Expand Down

0 comments on commit 382c101

Please sign in to comment.