From 4f6340b0930bb1b5430209c4a1ff196c42b834d0 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Wed, 6 Mar 2024 18:19:16 +0530 Subject: [PATCH] feat: add retry and timeout for batch dml (#1107) * feat(spanner): add retry, timeout for batch update * feat(spanner): add samples for retry, timeout * feat(spanner): update unittest * feat(spanner): update comments * feat(spanner): update code for retry * feat(spanner): update comment --- google/cloud/spanner_v1/transaction.py | 17 ++++++++- samples/samples/snippets.py | 50 ++++++++++++++++++++++++++ samples/samples/snippets_test.py | 7 ++++ tests/unit/test_spanner.py | 14 ++++++++ tests/unit/test_transaction.py | 25 +++++++++++-- 5 files changed, 110 insertions(+), 3 deletions(-) diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 1f5ff1098a..b02a43e8d2 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -410,7 +410,14 @@ def execute_update( return response.stats.row_count_exact - def batch_update(self, statements, request_options=None): + def batch_update( + self, + statements, + request_options=None, + *, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + ): """Perform a batch of DML statements via an ``ExecuteBatchDml`` request. :type statements: @@ -431,6 +438,12 @@ def batch_update(self, statements, request_options=None): If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :type retry: :class:`~google.api_core.retry.Retry` + :param retry: (Optional) The retry settings for this request. + + :type timeout: float + :param timeout: (Optional) The timeout for this request. + :rtype: Tuple(status, Sequence[int]) :returns: @@ -486,6 +499,8 @@ def batch_update(self, statements, request_options=None): api.execute_batch_dml, request=request, metadata=metadata, + retry=retry, + timeout=timeout, ) if self._transaction_id is None: diff --git a/samples/samples/snippets.py b/samples/samples/snippets.py index 5cd1cc8e8b..23d9d8aff1 100644 --- a/samples/samples/snippets.py +++ b/samples/samples/snippets.py @@ -3017,6 +3017,51 @@ def directed_read_options( # [END spanner_directed_read] +def set_custom_timeout_and_retry(instance_id, database_id): + """Executes a snapshot read with custom timeout and retry.""" + # [START spanner_set_custom_timeout_and_retry] + from google.api_core import retry + from google.api_core import exceptions as core_exceptions + + # instance_id = "your-spanner-instance" + # database_id = "your-spanner-db-id" + spanner_client = spanner.Client() + instance = spanner_client.instance(instance_id) + database = instance.database(database_id) + + retry = retry.Retry( + # Customize retry with an initial wait time of 500 milliseconds. + initial=0.5, + # Customize retry with a maximum wait time of 16 seconds. + maximum=16, + # Customize retry with a wait time multiplier per iteration of 1.5. + multiplier=1.5, + # Customize retry with a timeout on + # how long a certain RPC may be retried in + # case the server returns an error. + timeout=60, + # Configure which errors should be retried. + predicate=retry.if_exception_type( + core_exceptions.ServiceUnavailable, + ), + ) + + # Set a custom retry and timeout setting. + with database.snapshot() as snapshot: + results = snapshot.execute_sql( + "SELECT SingerId, AlbumId, AlbumTitle FROM Albums", + # Set custom retry setting for this request + retry=retry, + # Set custom timeout of 60 seconds for this request + timeout=60, + ) + + for row in results: + print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row)) + + # [END spanner_set_custom_timeout_and_retry] + + if __name__ == "__main__": # noqa: C901 parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter @@ -3157,6 +3202,9 @@ def directed_read_options( ) enable_fine_grained_access_parser.add_argument("--title", default="condition title") subparsers.add_parser("directed_read_options", help=directed_read_options.__doc__) + subparsers.add_parser( + "set_custom_timeout_and_retry", help=set_custom_timeout_and_retry.__doc__ + ) args = parser.parse_args() @@ -3290,3 +3338,5 @@ def directed_read_options( ) elif args.command == "directed_read_options": directed_read_options(args.instance_id, args.database_id) + elif args.command == "set_custom_timeout_and_retry": + set_custom_timeout_and_retry(args.instance_id, args.database_id) diff --git a/samples/samples/snippets_test.py b/samples/samples/snippets_test.py index 6942f8fa79..7c8de8ab96 100644 --- a/samples/samples/snippets_test.py +++ b/samples/samples/snippets_test.py @@ -859,3 +859,10 @@ def test_directed_read_options(capsys, instance_id, sample_database): snippets.directed_read_options(instance_id, sample_database.database_id) out, _ = capsys.readouterr() assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out + + +@pytest.mark.dependency(depends=["insert_data"]) +def test_set_custom_timeout_and_retry(capsys, instance_id, sample_database): + snippets.set_custom_timeout_and_retry(instance_id, sample_database.database_id) + out, _ = capsys.readouterr() + assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index 3663d8bdc9..0c7feed5ac 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -556,6 +556,8 @@ def test_transaction_should_include_begin_with_first_batch_update(self): ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( @@ -574,6 +576,8 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) self._execute_update_helper(transaction=transaction, api=api) api.execute_sql.assert_called_once_with( @@ -715,6 +719,8 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) def test_transaction_should_use_transaction_id_returned_by_first_batch_update(self): @@ -729,6 +735,8 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) self._read_helper(transaction=transaction, api=api) api.streaming_read.assert_called_once_with( @@ -797,6 +805,8 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) self.assertEqual(api.execute_sql.call_count, 2) @@ -846,6 +856,8 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) api.execute_batch_dml.assert_any_call( @@ -854,6 +866,8 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) self.assertEqual(api.execute_sql.call_count, 1) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index a673eabb83..b40ae8843f 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -662,7 +662,14 @@ def test_batch_update_other_error(self): with self.assertRaises(RuntimeError): transaction.batch_update(statements=[DML_QUERY]) - def _batch_update_helper(self, error_after=None, count=0, request_options=None): + def _batch_update_helper( + self, + error_after=None, + count=0, + request_options=None, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + ): from google.rpc.status_pb2 import Status from google.protobuf.struct_pb2 import Struct from google.cloud.spanner_v1 import param_types @@ -716,7 +723,10 @@ def _batch_update_helper(self, error_after=None, count=0, request_options=None): request_options = RequestOptions(request_options) status, row_counts = transaction.batch_update( - dml_statements, request_options=request_options + dml_statements, + request_options=request_options, + retry=retry, + timeout=timeout, ) self.assertEqual(status, expected_status) @@ -753,6 +763,8 @@ def _batch_update_helper(self, error_after=None, count=0, request_options=None): ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=retry, + timeout=timeout, ) self.assertEqual(transaction._execute_sql_count, count + 1) @@ -826,6 +838,15 @@ def test_batch_update_error(self): self.assertEqual(transaction._execute_sql_count, 1) + def test_batch_update_w_timeout_param(self): + self._batch_update_helper(timeout=2.0) + + def test_batch_update_w_retry_param(self): + self._batch_update_helper(retry=gapic_v1.method.DEFAULT) + + def test_batch_update_w_timeout_and_retry_params(self): + self._batch_update_helper(retry=gapic_v1.method.DEFAULT, timeout=2.0) + def test_context_mgr_success(self): import datetime from google.cloud.spanner_v1 import CommitResponse