Skip to content

Commit 3fc9461

Browse files
FloMomeladkal
andauthored
Add CloudSQLCloneInstanceOperator (#29726)
* Add google-cloud-sql operator to clone instances. --------- Co-authored-by: eladkal <[email protected]>
1 parent 47ab0ca commit 3fc9461

File tree

6 files changed

+237
-1
lines changed

6 files changed

+237
-1
lines changed

airflow/providers/google/cloud/hooks/cloud_sql.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,33 @@ def import_instance(self, instance: str, body: dict, project_id: str) -> None:
347347
except HttpError as ex:
348348
raise AirflowException(f"Importing instance {instance} failed: {ex.content}")
349349

350+
@GoogleBaseHook.fallback_to_default_project_id
351+
def clone_instance(self, instance: str, body: dict, project_id: str) -> None:
352+
"""
353+
Clones an instance to a target instance.
354+
355+
:param instance: Database instance ID to be cloned. This does not include the
356+
project ID.
357+
:param instance: Database instance ID to be used for the clone. This does not include the
358+
project ID.
359+
:param body: The request body, as described in
360+
https://cloud.google.com/sql/docs/mysql/admin-api/rest/v1/instances/clone
361+
:param project_id: Project ID of the project that contains the instance. If set
362+
to None or missing, the default project_id from the Google Cloud connection is used.
363+
:return: None
364+
"""
365+
try:
366+
response = (
367+
self.get_conn()
368+
.instances()
369+
.clone(project=project_id, instance=instance, body=body)
370+
.execute(num_retries=self.num_retries)
371+
)
372+
operation_name = response["name"]
373+
self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name)
374+
except HttpError as ex:
375+
raise AirflowException(f"Cloning of instance {instance} failed: {ex.content}")
376+
350377
def _wait_for_operation_to_complete(self, project_id: str, operation_name: str) -> None:
351378
"""
352379
Waits for the named operation to complete - checks status of the

airflow/providers/google/cloud/operators/cloud_sql.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,98 @@ def execute(self, context: Context) -> bool | None:
524524
return hook.delete_instance(project_id=self.project_id, instance=self.instance)
525525

526526

527+
class CloudSQLCloneInstanceOperator(CloudSQLBaseOperator):
528+
"""
529+
Clones an instance to a target instance
530+
531+
.. seealso::
532+
For more information on how to use this operator, take a look at the guide:
533+
:ref:`howto/operator:CloudSQLCloneInstanceOperator`
534+
535+
:param instance: Database instance ID to be cloned. This does not include the
536+
project ID.
537+
:param destination_instance_name: Database instance ID to be created. This does not include the
538+
project ID.
539+
:param clone_context: additional clone_context parameters as described in
540+
https://cloud.google.com/sql/docs/mysql/admin-api/rest/v1/instances/clone
541+
:param project_id: Project ID of the project that contains the instance. If set
542+
to None or missing, the default project_id from the Google Cloud connection is used.
543+
:param gcp_conn_id: The connection ID used to connect to Google Cloud.
544+
:param api_version: API version used (e.g. v1beta4).
545+
:param impersonation_chain: Optional service account to impersonate using short-term
546+
credentials, or chained list of accounts required to get the access_token
547+
of the last account in the list, which will be impersonated in the request.
548+
If set as a string, the account must grant the originating account
549+
the Service Account Token Creator IAM role.
550+
If set as a sequence, the identities from the list must grant
551+
Service Account Token Creator IAM role to the directly preceding identity, with first
552+
account from the list granting this role to the originating account (templated).
553+
"""
554+
555+
# [START gcp_sql_clone_template_fields]
556+
template_fields: Sequence[str] = (
557+
"project_id",
558+
"instance",
559+
"destination_instance_name",
560+
"gcp_conn_id",
561+
"api_version",
562+
)
563+
# [END gcp_sql_clone_template_fields]
564+
565+
def __init__(
566+
self,
567+
*,
568+
instance: str,
569+
destination_instance_name: str,
570+
clone_context: dict | None = None,
571+
project_id: str | None = None,
572+
gcp_conn_id: str = "google_cloud_default",
573+
api_version: str = "v1beta4",
574+
impersonation_chain: str | Sequence[str] | None = None,
575+
**kwargs,
576+
) -> None:
577+
self.destination_instance_name = destination_instance_name
578+
self.clone_context = clone_context or {}
579+
super().__init__(
580+
project_id=project_id,
581+
instance=instance,
582+
gcp_conn_id=gcp_conn_id,
583+
api_version=api_version,
584+
impersonation_chain=impersonation_chain,
585+
**kwargs,
586+
)
587+
588+
def _validate_inputs(self) -> None:
589+
super()._validate_inputs()
590+
if not self.destination_instance_name:
591+
raise AirflowException("The required parameter 'destination_instance_name' is empty or None")
592+
593+
def execute(self, context: Context):
594+
hook = CloudSQLHook(
595+
gcp_conn_id=self.gcp_conn_id,
596+
api_version=self.api_version,
597+
impersonation_chain=self.impersonation_chain,
598+
)
599+
if not self._check_if_instance_exists(self.instance, hook):
600+
raise AirflowException(
601+
f"Cloud SQL instance with ID {self.instance} does not exist. "
602+
"Please specify another instance to patch."
603+
)
604+
else:
605+
body = {
606+
"cloneContext": {
607+
"kind": "sql#cloneContext",
608+
"destinationInstanceName": self.destination_instance_name,
609+
**self.clone_context,
610+
}
611+
}
612+
return hook.clone_instance(
613+
project_id=self.project_id,
614+
body=body,
615+
instance=self.instance,
616+
)
617+
618+
527619
class CloudSQLCreateInstanceDatabaseOperator(CloudSQLBaseOperator):
528620
"""
529621
Creates a new database inside a Cloud SQL instance.

docs/apache-airflow-providers-google/operators/cloud/cloud_sql.rst

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,49 @@ More information
473473
See Google Cloud SQL API documentation to `patch an instance
474474
<https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch>`_.
475475

476+
.. _howto/operator:CloudSQLCloneInstanceOperator:
477+
478+
CloudSQLCloneInstanceOperator
479+
-----------------------------
480+
481+
Clones an Cloud SQL instance.
482+
483+
For parameter definition, take a look at
484+
:class:`~airflow.providers.google.cloud.operators.cloud_sql.CloudSQLCloneInstanceOperator`.
485+
486+
Arguments
487+
"""""""""
488+
489+
For ``clone_context`` object attributes please refer to
490+
`CloneContext <https://cloud.google.com/sql/docs/mysql/admin-api/rest/v1beta4/instances/clone#clonecontext>`_
491+
492+
Using the operator
493+
""""""""""""""""""
494+
495+
You can create the operator with or without project id. If project id is missing it will be retrieved from the Google
496+
Cloud connection used. Both variants are shown:
497+
498+
.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py
499+
:language: python
500+
:dedent: 4
501+
:start-after: [START howto_operator_cloudsql_clone]
502+
:end-before: [END howto_operator_cloudsql_clone]
503+
504+
Templating
505+
""""""""""
506+
507+
.. literalinclude:: /../../airflow/providers/google/cloud/operators/cloud_sql.py
508+
:language: python
509+
:dedent: 4
510+
:start-after: [START gcp_sql_clone_template_fields]
511+
:end-before: [END gcp_sql_clone_template_fields]
512+
513+
More information
514+
""""""""""""""""
515+
516+
See Google Cloud SQL API documentation to `clone an instance
517+
<https://cloud.google.com/sql/docs/mysql/admin-api/rest/v1beta4/instances/clone>`_.
518+
476519
.. _howto/operator:CloudSQLExecuteQueryOperator:
477520

478521
CloudSQLExecuteQueryOperator

tests/providers/google/cloud/hooks/test_cloud_sql.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,32 @@ def test_delete_instance_with_in_progress_retry(
325325
operation_name="operation_id", project_id="example-project"
326326
)
327327

328+
@mock.patch(
329+
"airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLHook.get_credentials_and_project_id",
330+
return_value=(mock.MagicMock(), "example-project"),
331+
)
332+
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLHook.get_conn")
333+
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLHook._wait_for_operation_to_complete")
334+
def test_instance_clone(self, wait_for_operation_to_complete, get_conn, mock_get_credentials):
335+
clone_method = get_conn.return_value.instances.return_value.clone
336+
execute_method = clone_method.return_value.execute
337+
execute_method.return_value = {"name": "operation_id"}
338+
wait_for_operation_to_complete.return_value = None
339+
body = {
340+
"cloneContext": {
341+
"kind": "sql#cloneContext",
342+
"destinationInstanceName": "clonedInstance",
343+
}
344+
}
345+
self.cloudsql_hook.clone_instance(instance="instance", body=body)
346+
347+
clone_method.assert_called_once_with(instance="instance", project="example-project", body=body)
348+
execute_method.assert_called_once_with(num_retries=5)
349+
wait_for_operation_to_complete.assert_called_once_with(
350+
operation_name="operation_id", project_id="example-project"
351+
)
352+
assert 1 == mock_get_credentials.call_count
353+
328354
@mock.patch(
329355
"airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLHook.get_credentials_and_project_id",
330356
return_value=(mock.MagicMock(), "example-project"),

tests/providers/google/cloud/operators/test_cloud_sql.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from airflow.exceptions import AirflowException
2626
from airflow.models import Connection
2727
from airflow.providers.google.cloud.operators.cloud_sql import (
28+
CloudSQLCloneInstanceOperator,
2829
CloudSQLCreateInstanceDatabaseOperator,
2930
CloudSQLCreateInstanceOperator,
3031
CloudSQLDeleteInstanceDatabaseOperator,
@@ -360,6 +361,36 @@ def test_instance_delete(self, mock_hook, _check_if_instance_exists):
360361
project_id=PROJECT_ID, instance=INSTANCE_NAME
361362
)
362363

364+
@mock.patch(
365+
"airflow.providers.google.cloud.operators.cloud_sql.CloudSQLCloneInstanceOperator._check_if_instance_exists"
366+
)
367+
@mock.patch("airflow.providers.google.cloud.operators.cloud_sql.CloudSQLHook")
368+
def test_instance_clone(self, mock_hook, _check_if_instance_exists):
369+
destination_instance_name = "clone-test-name"
370+
_check_if_instance_exists.return_value = True
371+
op = CloudSQLCloneInstanceOperator(
372+
project_id=PROJECT_ID,
373+
instance=INSTANCE_NAME,
374+
destination_instance_name=destination_instance_name,
375+
task_id="id",
376+
)
377+
result = op.execute(None)
378+
assert result
379+
mock_hook.assert_called_once_with(
380+
api_version="v1beta4",
381+
gcp_conn_id="google_cloud_default",
382+
impersonation_chain=None,
383+
)
384+
body = {
385+
"cloneContext": {
386+
"kind": "sql#cloneContext",
387+
"destinationInstanceName": destination_instance_name,
388+
}
389+
}
390+
mock_hook.return_value.clone_instance.assert_called_once_with(
391+
project_id=PROJECT_ID, instance=INSTANCE_NAME, body=body
392+
)
393+
363394
@mock.patch(
364395
"airflow.providers.google.cloud.operators.cloud_sql"
365396
".CloudSQLDeleteInstanceOperator._check_if_instance_exists"

tests/system/providers/google/cloud/cloud_sql/example_cloud_sql.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from airflow import models
3535
from airflow.models.xcom_arg import XComArg
3636
from airflow.providers.google.cloud.operators.cloud_sql import (
37+
CloudSQLCloneInstanceOperator,
3738
CloudSQLCreateInstanceDatabaseOperator,
3839
CloudSQLCreateInstanceOperator,
3940
CloudSQLDeleteInstanceDatabaseOperator,
@@ -64,7 +65,7 @@
6465

6566
FAILOVER_REPLICA_NAME = f"{INSTANCE_NAME}-failover-replica"
6667
READ_REPLICA_NAME = f"{INSTANCE_NAME}-read-replica"
67-
68+
CLONED_INSTANCE_NAME = f"{INSTANCE_NAME}-clone"
6869

6970
# Bodies below represent Cloud SQL instance resources:
7071
# https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances
@@ -232,6 +233,15 @@
232233
)
233234
# [END howto_operator_cloudsql_import]
234235

236+
# ############################################## #
237+
# ### CLONE AN INSTANCE ######################## #
238+
# ############################################## #
239+
# [START howto_operator_cloudsql_clone]
240+
sql_instance_clone = CloudSQLCloneInstanceOperator(
241+
instance=INSTANCE_NAME, destination_instance_name=CLONED_INSTANCE_NAME, task_id="sql_instance_clone"
242+
)
243+
# [END howto_operator_cloudsql_clone]
244+
235245
# ############################################## #
236246
# ### DELETING A DATABASE FROM AN INSTANCE ##### #
237247
# ############################################## #
@@ -260,6 +270,11 @@
260270
sql_instance_failover_replica_delete_task.trigger_rule = TriggerRule.ALL_DONE
261271
sql_instance_read_replica_delete_task.trigger_rule = TriggerRule.ALL_DONE
262272

273+
sql_instance_clone_delete_task = CloudSQLDeleteInstanceOperator(
274+
instance=CLONED_INSTANCE_NAME,
275+
task_id="sql_instance_clone_delete_task",
276+
)
277+
263278
# [START howto_operator_cloudsql_delete]
264279
sql_instance_delete_task = CloudSQLDeleteInstanceOperator(
265280
instance=INSTANCE_NAME, task_id="sql_instance_delete_task"
@@ -284,9 +299,11 @@
284299
>> sql_export_task
285300
>> sql_gcp_add_object_permission_task
286301
>> sql_import_task
302+
>> sql_instance_clone
287303
>> sql_db_delete_task
288304
>> sql_instance_failover_replica_delete_task
289305
>> sql_instance_read_replica_delete_task
306+
>> sql_instance_clone_delete_task
290307
>> sql_instance_delete_task
291308
# TEST TEARDOWN
292309
>> delete_bucket

0 commit comments

Comments
 (0)