Skip to content

Commit

Permalink
Add CloudSQLCloneInstanceOperator (#29726)
Browse files Browse the repository at this point in the history
* Add google-cloud-sql operator to clone instances.

---------

Co-authored-by: eladkal <[email protected]>
  • Loading branch information
FloMom and eladkal committed Mar 3, 2023
1 parent 47ab0ca commit 3fc9461
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 1 deletion.
27 changes: 27 additions & 0 deletions airflow/providers/google/cloud/hooks/cloud_sql.py
Expand Up @@ -347,6 +347,33 @@ def import_instance(self, instance: str, body: dict, project_id: str) -> None:
except HttpError as ex:
raise AirflowException(f"Importing instance {instance} failed: {ex.content}")

@GoogleBaseHook.fallback_to_default_project_id
def clone_instance(self, instance: str, body: dict, project_id: str) -> None:
"""
Clones an instance to a target instance.
:param instance: Database instance ID to be cloned. This does not include the
project ID.
:param instance: Database instance ID to be used for the clone. This does not include the
project ID.
:param body: The request body, as described in
https://cloud.google.com/sql/docs/mysql/admin-api/rest/v1/instances/clone
:param project_id: Project ID of the project that contains the instance. If set
to None or missing, the default project_id from the Google Cloud connection is used.
:return: None
"""
try:
response = (
self.get_conn()
.instances()
.clone(project=project_id, instance=instance, body=body)
.execute(num_retries=self.num_retries)
)
operation_name = response["name"]
self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name)
except HttpError as ex:
raise AirflowException(f"Cloning of instance {instance} failed: {ex.content}")

def _wait_for_operation_to_complete(self, project_id: str, operation_name: str) -> None:
"""
Waits for the named operation to complete - checks status of the
Expand Down
92 changes: 92 additions & 0 deletions airflow/providers/google/cloud/operators/cloud_sql.py
Expand Up @@ -524,6 +524,98 @@ def execute(self, context: Context) -> bool | None:
return hook.delete_instance(project_id=self.project_id, instance=self.instance)


class CloudSQLCloneInstanceOperator(CloudSQLBaseOperator):
"""
Clones an instance to a target instance
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudSQLCloneInstanceOperator`
:param instance: Database instance ID to be cloned. This does not include the
project ID.
:param destination_instance_name: Database instance ID to be created. This does not include the
project ID.
:param clone_context: additional clone_context parameters as described in
https://cloud.google.com/sql/docs/mysql/admin-api/rest/v1/instances/clone
:param project_id: Project ID of the project that contains the instance. If set
to None or missing, the default project_id from the Google Cloud connection is used.
:param gcp_conn_id: The connection ID used to connect to Google Cloud.
:param api_version: API version used (e.g. v1beta4).
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""

# [START gcp_sql_clone_template_fields]
template_fields: Sequence[str] = (
"project_id",
"instance",
"destination_instance_name",
"gcp_conn_id",
"api_version",
)
# [END gcp_sql_clone_template_fields]

def __init__(
self,
*,
instance: str,
destination_instance_name: str,
clone_context: dict | None = None,
project_id: str | None = None,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1beta4",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
self.destination_instance_name = destination_instance_name
self.clone_context = clone_context or {}
super().__init__(
project_id=project_id,
instance=instance,
gcp_conn_id=gcp_conn_id,
api_version=api_version,
impersonation_chain=impersonation_chain,
**kwargs,
)

def _validate_inputs(self) -> None:
super()._validate_inputs()
if not self.destination_instance_name:
raise AirflowException("The required parameter 'destination_instance_name' is empty or None")

def execute(self, context: Context):
hook = CloudSQLHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
impersonation_chain=self.impersonation_chain,
)
if not self._check_if_instance_exists(self.instance, hook):
raise AirflowException(
f"Cloud SQL instance with ID {self.instance} does not exist. "
"Please specify another instance to patch."
)
else:
body = {
"cloneContext": {
"kind": "sql#cloneContext",
"destinationInstanceName": self.destination_instance_name,
**self.clone_context,
}
}
return hook.clone_instance(
project_id=self.project_id,
body=body,
instance=self.instance,
)


class CloudSQLCreateInstanceDatabaseOperator(CloudSQLBaseOperator):
"""
Creates a new database inside a Cloud SQL instance.
Expand Down
43 changes: 43 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/cloud_sql.rst
Expand Up @@ -473,6 +473,49 @@ More information
See Google Cloud SQL API documentation to `patch an instance
<https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch>`_.

.. _howto/operator:CloudSQLCloneInstanceOperator:

CloudSQLCloneInstanceOperator
-----------------------------

Clones an Cloud SQL instance.

For parameter definition, take a look at
:class:`~airflow.providers.google.cloud.operators.cloud_sql.CloudSQLCloneInstanceOperator`.

Arguments
"""""""""

For ``clone_context`` object attributes please refer to
`CloneContext <https://cloud.google.com/sql/docs/mysql/admin-api/rest/v1beta4/instances/clone#clonecontext>`_

Using the operator
""""""""""""""""""

You can create the operator with or without project id. If project id is missing it will be retrieved from the Google
Cloud connection used. Both variants are shown:

.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_cloudsql_clone]
:end-before: [END howto_operator_cloudsql_clone]

Templating
""""""""""

.. literalinclude:: /../../airflow/providers/google/cloud/operators/cloud_sql.py
:language: python
:dedent: 4
:start-after: [START gcp_sql_clone_template_fields]
:end-before: [END gcp_sql_clone_template_fields]

More information
""""""""""""""""

See Google Cloud SQL API documentation to `clone an instance
<https://cloud.google.com/sql/docs/mysql/admin-api/rest/v1beta4/instances/clone>`_.

.. _howto/operator:CloudSQLExecuteQueryOperator:

CloudSQLExecuteQueryOperator
Expand Down
26 changes: 26 additions & 0 deletions tests/providers/google/cloud/hooks/test_cloud_sql.py
Expand Up @@ -325,6 +325,32 @@ def test_delete_instance_with_in_progress_retry(
operation_name="operation_id", project_id="example-project"
)

@mock.patch(
"airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLHook.get_credentials_and_project_id",
return_value=(mock.MagicMock(), "example-project"),
)
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLHook.get_conn")
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLHook._wait_for_operation_to_complete")
def test_instance_clone(self, wait_for_operation_to_complete, get_conn, mock_get_credentials):
clone_method = get_conn.return_value.instances.return_value.clone
execute_method = clone_method.return_value.execute
execute_method.return_value = {"name": "operation_id"}
wait_for_operation_to_complete.return_value = None
body = {
"cloneContext": {
"kind": "sql#cloneContext",
"destinationInstanceName": "clonedInstance",
}
}
self.cloudsql_hook.clone_instance(instance="instance", body=body)

clone_method.assert_called_once_with(instance="instance", project="example-project", body=body)
execute_method.assert_called_once_with(num_retries=5)
wait_for_operation_to_complete.assert_called_once_with(
operation_name="operation_id", project_id="example-project"
)
assert 1 == mock_get_credentials.call_count

@mock.patch(
"airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLHook.get_credentials_and_project_id",
return_value=(mock.MagicMock(), "example-project"),
Expand Down
31 changes: 31 additions & 0 deletions tests/providers/google/cloud/operators/test_cloud_sql.py
Expand Up @@ -25,6 +25,7 @@
from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.providers.google.cloud.operators.cloud_sql import (
CloudSQLCloneInstanceOperator,
CloudSQLCreateInstanceDatabaseOperator,
CloudSQLCreateInstanceOperator,
CloudSQLDeleteInstanceDatabaseOperator,
Expand Down Expand Up @@ -360,6 +361,36 @@ def test_instance_delete(self, mock_hook, _check_if_instance_exists):
project_id=PROJECT_ID, instance=INSTANCE_NAME
)

@mock.patch(
"airflow.providers.google.cloud.operators.cloud_sql.CloudSQLCloneInstanceOperator._check_if_instance_exists"
)
@mock.patch("airflow.providers.google.cloud.operators.cloud_sql.CloudSQLHook")
def test_instance_clone(self, mock_hook, _check_if_instance_exists):
destination_instance_name = "clone-test-name"
_check_if_instance_exists.return_value = True
op = CloudSQLCloneInstanceOperator(
project_id=PROJECT_ID,
instance=INSTANCE_NAME,
destination_instance_name=destination_instance_name,
task_id="id",
)
result = op.execute(None)
assert result
mock_hook.assert_called_once_with(
api_version="v1beta4",
gcp_conn_id="google_cloud_default",
impersonation_chain=None,
)
body = {
"cloneContext": {
"kind": "sql#cloneContext",
"destinationInstanceName": destination_instance_name,
}
}
mock_hook.return_value.clone_instance.assert_called_once_with(
project_id=PROJECT_ID, instance=INSTANCE_NAME, body=body
)

@mock.patch(
"airflow.providers.google.cloud.operators.cloud_sql"
".CloudSQLDeleteInstanceOperator._check_if_instance_exists"
Expand Down
Expand Up @@ -34,6 +34,7 @@
from airflow import models
from airflow.models.xcom_arg import XComArg
from airflow.providers.google.cloud.operators.cloud_sql import (
CloudSQLCloneInstanceOperator,
CloudSQLCreateInstanceDatabaseOperator,
CloudSQLCreateInstanceOperator,
CloudSQLDeleteInstanceDatabaseOperator,
Expand Down Expand Up @@ -64,7 +65,7 @@

FAILOVER_REPLICA_NAME = f"{INSTANCE_NAME}-failover-replica"
READ_REPLICA_NAME = f"{INSTANCE_NAME}-read-replica"

CLONED_INSTANCE_NAME = f"{INSTANCE_NAME}-clone"

# Bodies below represent Cloud SQL instance resources:
# https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances
Expand Down Expand Up @@ -232,6 +233,15 @@
)
# [END howto_operator_cloudsql_import]

# ############################################## #
# ### CLONE AN INSTANCE ######################## #
# ############################################## #
# [START howto_operator_cloudsql_clone]
sql_instance_clone = CloudSQLCloneInstanceOperator(
instance=INSTANCE_NAME, destination_instance_name=CLONED_INSTANCE_NAME, task_id="sql_instance_clone"
)
# [END howto_operator_cloudsql_clone]

# ############################################## #
# ### DELETING A DATABASE FROM AN INSTANCE ##### #
# ############################################## #
Expand Down Expand Up @@ -260,6 +270,11 @@
sql_instance_failover_replica_delete_task.trigger_rule = TriggerRule.ALL_DONE
sql_instance_read_replica_delete_task.trigger_rule = TriggerRule.ALL_DONE

sql_instance_clone_delete_task = CloudSQLDeleteInstanceOperator(
instance=CLONED_INSTANCE_NAME,
task_id="sql_instance_clone_delete_task",
)

# [START howto_operator_cloudsql_delete]
sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
instance=INSTANCE_NAME, task_id="sql_instance_delete_task"
Expand All @@ -284,9 +299,11 @@
>> sql_export_task
>> sql_gcp_add_object_permission_task
>> sql_import_task
>> sql_instance_clone
>> sql_db_delete_task
>> sql_instance_failover_replica_delete_task
>> sql_instance_read_replica_delete_task
>> sql_instance_clone_delete_task
>> sql_instance_delete_task
# TEST TEARDOWN
>> delete_bucket
Expand Down

0 comments on commit 3fc9461

Please sign in to comment.