Skip to content

Commit

Permalink
fix: first pass on making retry configuration more consistent (#695)
Browse files Browse the repository at this point in the history
* fix: first pass on making retry configuration more consistent

Currently ReadRows uses the Retry.deadline configuration inconsistently:
- its used as the attempt timeout for the first retry attempt
- its used as a limit for retry scheduling for reading a single row

Conceptually there are 3 timeouts that are relevant to ReadRows:
- attempt timeout: how long a single RPC is allowed to run, this should map directly to a gRPC deadline
- overall timeout: Limit how long we should wait across all of the retry attempts, possibly truncating the last attempt timeout.
- read timeout: How long we are willing to wait for the next row in a stream

Ideally Retry.deadline would represent an operation deadline (since thats the primary concern of the end user). However there is no backwards compatible way to do this. Changing the behavior would cause existing application to start enforcing a very short deadline.

This PR tries to improve the situation in a backwards compatible way:
- keep Retry.deadline as a read timeout
- introduce a new parameter for overall timeout

This results in less than ideal api, but avoids breaking existing applications.

* fix old test

* add attempt timeout

* lint

* add some tests

* lint

* refactor confusing logic

* apply fixes from review

* address feedback
  • Loading branch information
igorbernstein2 committed Nov 18, 2022
1 parent 5c72780 commit c707c30
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 29 deletions.
67 changes: 58 additions & 9 deletions google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@


import copy
import time

import six

import grpc
Expand Down Expand Up @@ -342,6 +344,10 @@ def _retry_read_rows_exception(exc):
initial=1.0,
maximum=15.0,
multiplier=2.0,
# NOTE: this is a soft read timeout: this limits for how long we are willing
# to schedule retry attempts to read the next row. This does not set the
# RPC timeout. Please use the separate overal_timeout parameter of read_rows
# to limit the operation duration
deadline=60.0, # 60 seconds
)
"""The default retry strategy to be used on retry-able errors.
Expand Down Expand Up @@ -389,7 +395,14 @@ class PartialRowsData(object):
STATE_CELL_IN_PROGRESS: CELL_IN_PROGRESS,
}

def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS):
def __init__(
self,
read_method,
request,
retry=DEFAULT_RETRY_READ_ROWS,
attempt_timeout=None,
overall_timeout=None,
):
# Counter for rows returned to the user
self._counter = 0
# In-progress row, unset until first response, after commit/reset
Expand All @@ -406,14 +419,14 @@ def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS):
self.read_method = read_method
self.request = request
self.retry = retry
self._attempt_timeout = attempt_timeout
# absolute timestamp when all retry attempts should end
if overall_timeout:
self._overall_deadline = time.time() + overall_timeout
else:
self._overall_deadline = None

# The `timeout` parameter must be somewhat greater than the value
# contained in `self.retry`, in order to avoid race-like condition and
# allow registering the first deadline error before invoking the retry.
# Otherwise there is a risk of entering an infinite loop that resets
# the timeout counter just before it being triggered. The increment
# by 1 second here is customary but should not be much less than that.
self.response_iterator = read_method(request, timeout=self.retry._deadline + 1)
self.response_iterator = self._create_read_stream(request)

self.rows = {}
self._state = self.STATE_NEW_ROW
Expand Down Expand Up @@ -451,6 +464,28 @@ class as a generator instead.
for row in self:
self.rows[row.row_key] = row

@property
def remaining_overall_timeout(self):
"""Returns the remaining deadline allotted for the entire stream.
Returns a float of seconds"""
if not self._overall_deadline:
return None

return self._overall_deadline - time.time()

def _create_read_stream(self, req):
"""Starts a new RPC bounded by the overall deadline and attempt timeout.
:type req: class:`data_messages_v2_pb2.ReadRowsRequest`
"""
effective_timeout = self.remaining_overall_timeout
if effective_timeout is None:
effective_timeout = self._attempt_timeout
elif self._attempt_timeout is not None:
effective_timeout = min(effective_timeout, self._attempt_timeout)

return self.read_method(req, timeout=effective_timeout)

def _create_retry_request(self):
"""Helper for :meth:`__iter__`."""
req_manager = _ReadRowsRequestManager(
Expand All @@ -465,7 +500,7 @@ def _on_error(self, exc):
if self.last_scanned_row_key:
retry_request = self._create_retry_request()

self.response_iterator = self.read_method(retry_request)
self.response_iterator = self._create_read_stream(retry_request)

def _read_next(self):
"""Helper for :meth:`__iter__`."""
Expand All @@ -476,6 +511,20 @@ def _read_next(self):

def _read_next_response(self):
"""Helper for :meth:`__iter__`."""
# Calculate the maximum amount of time that retries should be scheduled.
# This will not actually set any deadlines, it will only limit the
# duration of time that we are willing to schedule retries for.
remaining_overall_timeout = self.remaining_overall_timeout

if remaining_overall_timeout is not None:
# we want make sure that the retry logic doesnt retry after the
# operation deadline is past
if (
self.retry.deadline is None
or self.retry.deadline > remaining_overall_timeout
):
self.retry = self.retry.with_deadline(remaining_overall_timeout)

return self.retry(self._read_next, on_error=self._on_error)()

def __iter__(self):
Expand Down
44 changes: 40 additions & 4 deletions google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ def get_cluster_states(self):
for cluster_id, value_pb in table_pb.cluster_states.items()
}

def read_row(self, row_key, filter_=None):
def read_row(self, row_key, filter_=None, overall_timeout=60):
"""Read a single row from this table.
For example:
Expand All @@ -506,7 +506,11 @@ def read_row(self, row_key, filter_=None):
"""
row_set = RowSet()
row_set.add_row_key(row_key)
result_iter = iter(self.read_rows(filter_=filter_, row_set=row_set))
result_iter = iter(
self.read_rows(
filter_=filter_, row_set=row_set, overall_timeout=overall_timeout
)
)
row = next(result_iter, None)
if next(result_iter, None) is not None:
raise ValueError("More than one row was returned.")
Expand All @@ -521,6 +525,8 @@ def read_rows(
end_inclusive=False,
row_set=None,
retry=DEFAULT_RETRY_READ_ROWS,
attempt_timeout=None,
overall_timeout=None,
):
"""Read rows from this table.
Expand Down Expand Up @@ -565,7 +571,22 @@ def read_rows(
default value :attr:`DEFAULT_RETRY_READ_ROWS` can be used and
modified with the :meth:`~google.api_core.retry.Retry.with_delay`
method or the :meth:`~google.api_core.retry.Retry.with_deadline`
method.
method. This retry object is used to try to fetch the next row:
this means that the deadline specified by this object is reset
after every row read. Furthermore, this deadline is loosely enforced:
it will only prevent additional attempts from be scheduled after the
deadline, it will not limit how long a single attempt to read the
next row will run. Prefer to use overall_timeout below.
:type attempt_timeout: float
:param attempt_timeout: (Optional) the attempt timeout to execute a
single RPC. If this attempt fails and there is overall_timeout
left, another attempt will be sent.
:type overall_timeout: float
:param overall_timeout: (Optional) the overall operation deadline to
to completely read the entire ReadRows stream.
:rtype: :class:`.PartialRowsData`
:returns: A :class:`.PartialRowsData` a generator for consuming
Expand All @@ -582,7 +603,13 @@ def read_rows(
row_set=row_set,
)
data_client = self._instance._client.table_data_client
return PartialRowsData(data_client.transport.read_rows, request_pb, retry)
return PartialRowsData(
data_client.transport.read_rows,
request_pb,
retry,
attempt_timeout=attempt_timeout,
overall_timeout=overall_timeout,
)

def yield_rows(self, **kwargs):
"""Read rows from this table.
Expand Down Expand Up @@ -615,6 +642,15 @@ def yield_rows(self, **kwargs):
:param row_set: (Optional) The row set containing multiple row keys and
row_ranges.
:type attempt_timeout: float
:param attempt_timeout: (Optional) the attempt timeout to execute a
single RPC. If this attempt fails and there is overall_timeout
left, another attempt will be sent.
:type overall_timeout: float
:param overall_timeout: (Optional) the overall operation deadline to
to completely read the entire ReadRows stream.
:rtype: :class:`.PartialRowData`
:returns: A :class:`.PartialRowData` for each row returned
"""
Expand Down
22 changes: 14 additions & 8 deletions tests/unit/test_row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,20 +370,26 @@ def test_constructor(self):
self.assertEqual(partial_rows_data.rows, {})
self.assertEqual(partial_rows_data.retry, DEFAULT_RETRY_READ_ROWS)

def test_constructor_with_retry(self):
from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS

def test_constructor_with_overall_timeout(self):
client = _Client()
client._data_stub = mock.MagicMock()
request = object()
retry = DEFAULT_RETRY_READ_ROWS
partial_rows_data = self._make_one(client._data_stub.ReadRows, request, retry)
partial_rows_data.read_method.assert_called_once_with(
request, timeout=DEFAULT_RETRY_READ_ROWS.deadline + 1
partial_rows_data = self._make_one(
client._data_stub.ReadRows, request, overall_timeout=11
)
partial_rows_data.read_method.assert_called_once_with(request, timeout=mock.ANY)

# the deadline being passed to the first RPC should be slightly less
# than 11. But to avoid flakiness on slow test runners, its padded down
# by 3 secs
self.assertLess(8, partial_rows_data.read_method.call_args.kwargs["timeout"])

self.assertIs(partial_rows_data.request, request)
self.assertEqual(partial_rows_data.rows, {})
self.assertEqual(partial_rows_data.retry, retry)
# The remaining deadline should be
# But to avoid flakiness on slow test runners, its padded down by 3 secs
self.assertLess(8, partial_rows_data.remaining_overall_timeout)
self.assertLessEqual(partial_rows_data.remaining_overall_timeout, 11)

def test___eq__(self):
client = _Client()
Expand Down
81 changes: 73 additions & 8 deletions tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import time
import unittest

import mock
Expand Down Expand Up @@ -798,17 +797,12 @@ def mock_create_row_request(table_name, **kwargs):

def test_read_retry_rows(self):
from google.cloud.bigtable_v2.gapic import bigtable_client
from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client
from google.api_core import retry

data_api = bigtable_client.BigtableClient(mock.Mock())
table_api = bigtable_table_admin_client.BigtableTableAdminClient(mock.Mock())
credentials = _make_credentials()
client = self._make_client(
project="project-id", credentials=credentials, admin=True
)
client = self._make_client(project="project-id", credentials=credentials)
client._table_data_client = data_api
client._table_admin_client = table_api
instance = client.instance(instance_id=self.INSTANCE_ID)
table = self._make_one(self.TABLE_ID, instance)

Expand Down Expand Up @@ -857,6 +851,77 @@ def test_read_retry_rows(self):
result = rows[1]
self.assertEqual(result.row_key, self.ROW_KEY_2)

def test_read_retry_rows_timeouts(self):
from google.cloud.bigtable_v2.gapic import bigtable_client

data_api = bigtable_client.BigtableClient(mock.Mock())
credentials = _make_credentials()
client = self._make_client(project="project-id", credentials=credentials)
client._table_data_client = data_api
instance = client.instance(instance_id=self.INSTANCE_ID)
table = self._make_one(self.TABLE_ID, instance)

# Patch the stub used by the API method.
client._table_data_client.transport.read_rows = mock.Mock(
side_effect=[_MockReadRowsIterator()]
)

# By default there is no timeout
list(table.read_rows())
self.assertIsNone(
client._table_data_client.transport.read_rows.call_args.kwargs["timeout"]
)

# attempt timeout should be passed thru
client._table_data_client.transport.read_rows = mock.Mock(
side_effect=[_MockReadRowsIterator()]
)
list(table.read_rows(attempt_timeout=1.0))
self.assertEquals(
1.0,
client._table_data_client.transport.read_rows.call_args.kwargs["timeout"],
)

# overall timeout should be passed thru
client._table_data_client.transport.read_rows = mock.Mock(
side_effect=[_MockReadRowsIterator()]
)
list(table.read_rows(overall_timeout=10.0))
# The RPC timeout should be slightly less than 10.0 but to avoid test
# flakiness its padded by a couple of secs.
self.assertLess(
8.0,
client._table_data_client.transport.read_rows.call_args.kwargs["timeout"],
)

# attempt timeout limits overall timeout
client._table_data_client.transport.read_rows = mock.Mock(
side_effect=[_MockReadRowsIterator()]
)
list(table.read_rows(attempt_timeout=5.0, overall_timeout=10.0))
self.assertLessEqual(
5.0,
client._table_data_client.transport.read_rows.call_args.kwargs["timeout"],
)

# attempt timeout is truncated by overall timeout
class DelayedFailureIterator(object):
def next(self):
time.sleep(0.75)
raise DeadlineExceeded("delayed error")

__next__ = next

client._table_data_client.transport.read_rows = mock.Mock(
side_effect=[DelayedFailureIterator(), _MockReadRowsIterator()]
)
list(table.read_rows(attempt_timeout=1.0, overall_timeout=1.0))

self.assertGreater(
1.0,
client._table_data_client.transport.read_rows.call_args.kwargs["timeout"],
)

def test_yield_retry_rows(self):
from google.cloud.bigtable_v2.gapic import bigtable_client
from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client
Expand Down

0 comments on commit c707c30

Please sign in to comment.