Skip to content

Commit

Permalink
Fix: Retry the RST Stream error in mutate rows and read rows(#624)
Browse files Browse the repository at this point in the history
Fix: Retry the RST Stream error in mutate rows and read rows

In mutate_rows and read_rows, Internal Server with RST Stream errors is considered transient, and should be retried.
  • Loading branch information
Mariatta committed Aug 8, 2022
1 parent 972939f commit d24574a
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 6 deletions.
24 changes: 23 additions & 1 deletion google/cloud/bigtable/row_data.py
Expand Up @@ -332,10 +332,32 @@ class InvalidRetryRequest(RuntimeError):
"""Exception raised when retry request is invalid."""


RETRYABLE_INTERNAL_ERROR_MESSAGES = (
"rst_stream",
"rst stream",
"received unexpected eos on data frame from server",
)
"""Internal error messages that can be retried during read row and mutation."""


def _retriable_internal_server_error(exc):
"""
Return True if the internal server error is retriable.
"""
return isinstance(exc, exceptions.InternalServerError) and any(
retryable_message in exc.message.lower()
for retryable_message in RETRYABLE_INTERNAL_ERROR_MESSAGES
)


def _retry_read_rows_exception(exc):
"""Return True if the exception is retriable for read row requests."""
if isinstance(exc, grpc.RpcError):
exc = exceptions.from_grpc_error(exc)
return isinstance(exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded))

return _retriable_internal_server_error(exc) or isinstance(
exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded)
)


DEFAULT_RETRY_READ_ROWS = retry.Retry(
Expand Down
25 changes: 21 additions & 4 deletions google/cloud/bigtable/table.py
Expand Up @@ -23,6 +23,7 @@
from google.api_core.exceptions import NotFound
from google.api_core.exceptions import RetryError
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.exceptions import InternalServerError
from google.api_core.gapic_v1.method import DEFAULT
from google.api_core.retry import if_exception_type
from google.api_core.retry import Retry
Expand All @@ -37,7 +38,10 @@
from google.cloud.bigtable.row import AppendRow
from google.cloud.bigtable.row import ConditionalRow
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.row_data import PartialRowsData
from google.cloud.bigtable.row_data import (
PartialRowsData,
_retriable_internal_server_error,
)
from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS
from google.cloud.bigtable.row_set import RowSet
from google.cloud.bigtable.row_set import RowRange
Expand All @@ -55,9 +59,15 @@
_MAX_BULK_MUTATIONS = 100000
VIEW_NAME_ONLY = enums.Table.View.NAME_ONLY

RETRYABLE_MUTATION_ERRORS = (Aborted, DeadlineExceeded, ServiceUnavailable)
RETRYABLE_MUTATION_ERRORS = (
Aborted,
DeadlineExceeded,
ServiceUnavailable,
InternalServerError,
)
"""Errors which can be retried during row mutation."""


RETRYABLE_CODES: Set[int] = set()

for retryable in RETRYABLE_MUTATION_ERRORS:
Expand Down Expand Up @@ -1130,11 +1140,18 @@ def _do_mutate_retryable_rows(self):
retry=None,
**kwargs
)
except RETRYABLE_MUTATION_ERRORS:
except RETRYABLE_MUTATION_ERRORS as exc:
# If an exception, considered retryable by `RETRYABLE_MUTATION_ERRORS`, is
# returned from the initial call, consider
# it to be retryable. Wrap as a Bigtable Retryable Error.
raise _BigtableRetryableError
# For InternalServerError, it is only retriable if the message is related to RST Stream messages
if _retriable_internal_server_error(exc) or not isinstance(
exc, InternalServerError
):
raise _BigtableRetryableError
else:
# re-raise the original exception
raise

num_responses = 0
num_retryable_responses = 0
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/test_row_data.py
Expand Up @@ -310,6 +310,31 @@ def test__retry_read_rows_exception_deadline_exceeded():
assert _retry_read_rows_exception(exception)


def test__retry_read_rows_exception_internal_server_not_retriable():
from google.api_core.exceptions import InternalServerError
from google.cloud.bigtable.row_data import (
_retry_read_rows_exception,
RETRYABLE_INTERNAL_ERROR_MESSAGES,
)

err_message = "500 Error"
exception = InternalServerError(err_message)
assert err_message not in RETRYABLE_INTERNAL_ERROR_MESSAGES
assert not _retry_read_rows_exception(exception)


def test__retry_read_rows_exception_internal_server_retriable():
from google.api_core.exceptions import InternalServerError
from google.cloud.bigtable.row_data import (
_retry_read_rows_exception,
RETRYABLE_INTERNAL_ERROR_MESSAGES,
)

for err_message in RETRYABLE_INTERNAL_ERROR_MESSAGES:
exception = InternalServerError(err_message)
assert _retry_read_rows_exception(exception)


def test__retry_read_rows_exception_miss_wrapped_in_grpc():
from google.api_core.exceptions import Conflict
from google.cloud.bigtable.row_data import _retry_read_rows_exception
Expand Down
55 changes: 54 additions & 1 deletion tests/unit/test_table.py
Expand Up @@ -47,6 +47,7 @@
RETRYABLE_3 = StatusCode.UNAVAILABLE.value[0]
RETRYABLES = (RETRYABLE_1, RETRYABLE_2, RETRYABLE_3)
NON_RETRYABLE = StatusCode.CANCELLED.value[0]
STATUS_INTERNAL = StatusCode.INTERNAL.value[0]


@mock.patch("google.cloud.bigtable.table._MAX_BULK_MUTATIONS", new=3)
Expand Down Expand Up @@ -1636,6 +1637,7 @@ def _do_mutate_retryable_rows_helper(
raising_retry=False,
retryable_error=False,
timeout=None,
mutate_rows_side_effect=None,
):
from google.api_core.exceptions import ServiceUnavailable
from google.cloud.bigtable.row import DirectRow
Expand Down Expand Up @@ -1664,8 +1666,13 @@ def _do_mutate_retryable_rows_helper(

data_api = client._table_data_client = _make_data_api()
if retryable_error:
data_api.mutate_rows.side_effect = ServiceUnavailable("testing")
if mutate_rows_side_effect is not None:
data_api.mutate_rows.side_effect = mutate_rows_side_effect
else:
data_api.mutate_rows.side_effect = ServiceUnavailable("testing")
else:
if mutate_rows_side_effect is not None:
data_api.mutate_rows.side_effect = mutate_rows_side_effect
data_api.mutate_rows.return_value = [response]

worker = _make_worker(client, table.name, rows=rows)
Expand Down Expand Up @@ -1785,6 +1792,52 @@ def test_rmrw_do_mutate_retryable_rows_w_retryable_error():
)


def test_rmrw_do_mutate_retryable_rows_w_retryable_error_internal_rst_stream_error():
# Mutate two rows
# Raise internal server error with RST STREAM error messages
# There should be no error raised and that the request is retried
from google.api_core.exceptions import InternalServerError
from google.cloud.bigtable.row_data import RETRYABLE_INTERNAL_ERROR_MESSAGES

row_cells = [
(b"row_key_1", ("cf", b"col", b"value1")),
(b"row_key_2", ("cf", b"col", b"value2")),
]
responses = ()

for retryable_internal_error_message in RETRYABLE_INTERNAL_ERROR_MESSAGES:
for message in [
retryable_internal_error_message,
retryable_internal_error_message.upper(),
]:
_do_mutate_retryable_rows_helper(
row_cells,
responses,
retryable_error=True,
mutate_rows_side_effect=InternalServerError(message),
)


def test_rmrw_do_mutate_rows_w_retryable_error_internal_not_retryable():
# Mutate two rows
# Raise internal server error but not RST STREAM error messages
# mutate_rows should raise Internal Server Error
from google.api_core.exceptions import InternalServerError

row_cells = [
(b"row_key_1", ("cf", b"col", b"value1")),
(b"row_key_2", ("cf", b"col", b"value2")),
]
responses = ()

with pytest.raises(InternalServerError):
_do_mutate_retryable_rows_helper(
row_cells,
responses,
mutate_rows_side_effect=InternalServerError("Error not retryable."),
)


def test_rmrw_do_mutate_retryable_rows_retry():
#
# Setup:
Expand Down

0 comments on commit d24574a

Please sign in to comment.