Skip to content

Commit

Permalink
perf: optimize row merging (#628)
Browse files Browse the repository at this point in the history
This PR rewrites the row merging logic to be more correct and improve performance:
- extract row merging logic into its own class to simplify complexity of ReadRows handling
- Use OrderedDict instead of dict() for `{family: { qualifier: [] }}` data, this should maintain serverside ordering (family in creation order and qualifier in lexiographical). 
- define an explicit state machine with states implemented as methods
- add various optimizations like:
  - __slots__ on hot objects to avoid dict lookups
  - avoiding dict lookups for contiguous family and qualifier keys
 
Overall this improves performance by 20% and in my opinion is a lot more readable
  • Loading branch information
igorbernstein2 committed Aug 17, 2022
1 parent e862460 commit c71ec70
Show file tree
Hide file tree
Showing 4 changed files with 470 additions and 279 deletions.
206 changes: 41 additions & 165 deletions google/cloud/bigtable/row_data.py
Expand Up @@ -18,45 +18,25 @@
import copy

import grpc # type: ignore

import warnings
from google.api_core import exceptions
from google.api_core import retry
from google.cloud._helpers import _to_bytes # type: ignore

from google.cloud.bigtable.row_merger import _RowMerger, _State
from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2
from google.cloud.bigtable_v2.types import data as data_v2_pb2
from google.cloud.bigtable.row import Cell, InvalidChunk, PartialRowData


# Some classes need to be re-exported here to keep backwards
# compatibility. Those classes were moved to row_merger, but we dont want to
# break enduser's imports. This hack, ensures they don't get marked as unused.
_ = (Cell, InvalidChunk, PartialRowData)


class PartialCellData(object):
"""Representation of partial cell in a Google Cloud Bigtable Table.
These are expected to be updated directly from a
:class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse`
:type row_key: bytes
:param row_key: The key for the row holding the (partial) cell.
:type family_name: str
:param family_name: The family name of the (partial) cell.
:type qualifier: bytes
:param qualifier: The column qualifier of the (partial) cell.
:type timestamp_micros: int
:param timestamp_micros: The timestamp (in microsecods) of the
(partial) cell.
:type labels: list of str
:param labels: labels assigned to the (partial) cell
:type value: bytes
:param value: The (accumulated) value of the (partial) cell.
"""
class PartialCellData(object): # pragma: NO COVER
"""This class is no longer used and will be removed in the future"""

def __init__(
self, row_key, family_name, qualifier, timestamp_micros, labels=(), value=b""
Expand All @@ -69,11 +49,6 @@ def __init__(
self.value = value

def append_value(self, value):
"""Append bytes from a new chunk to value.
:type value: bytes
:param value: bytes to append
"""
self.value += value


Expand Down Expand Up @@ -168,14 +143,7 @@ class PartialRowsData(object):
def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS):
# Counter for rows returned to the user
self._counter = 0
# In-progress row, unset until first response, after commit/reset
self._row = None
# Last complete row, unset until first commit
self._previous_row = None
# In-progress cell, unset until first response, after completion
self._cell = None
# Last complete cell, unset until first completion, after new row
self._previous_cell = None
self._row_merger = _RowMerger()

# May be cached from previous response
self.last_scanned_row_key = None
Expand All @@ -192,20 +160,35 @@ def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS):
self.response_iterator = read_method(request, timeout=self.retry._deadline + 1)

self.rows = {}
self._state = self.STATE_NEW_ROW

# Flag to stop iteration, for any reason not related to self.retry()
self._cancelled = False

@property
def state(self):
"""State machine state.
:rtype: str
:returns: name of state corresponding to current row / chunk
processing.
def state(self): # pragma: NO COVER
"""
DEPRECATED: this property is deprecated and will be removed in the
future.
"""
return self.read_states[self._state]
warnings.warn(
"`PartialRowsData#state()` is deprecated and will be removed in the future",
DeprecationWarning,
stacklevel=2,
)

# Best effort: try to map internal RowMerger states to old strings for
# backwards compatibility
internal_state = self._row_merger.state
if internal_state == _State.ROW_START:
return self.NEW_ROW
# note: _State.CELL_START, _State.CELL_COMPLETE are transient states
# and will not be visible in between chunks
elif internal_state == _State.CELL_IN_PROGRESS:
return self.CELL_IN_PROGRESS
elif internal_state == _State.ROW_COMPLETE:
return self.NEW_ROW
else:
raise RuntimeError("unexpected internal state: " + self._)

def cancel(self):
"""Cancels the iterator, closing the stream."""
Expand Down Expand Up @@ -241,6 +224,7 @@ def _on_error(self, exc):
if self.last_scanned_row_key:
retry_request = self._create_retry_request()

self._row_merger = _RowMerger(self._row_merger.last_seen_row_key)
self.response_iterator = self.read_method(retry_request)

def _read_next(self):
Expand All @@ -266,125 +250,23 @@ def __iter__(self):
try:
response = self._read_next_response()
except StopIteration:
if self.state != self.NEW_ROW:
raise ValueError("The row remains partial / is not committed.")
self._row_merger.finalize()
break
except InvalidRetryRequest:
self._cancelled = True
break

for chunk in response.chunks:
for row in self._row_merger.process_chunks(response):
self.last_scanned_row_key = self._row_merger.last_seen_row_key
self._counter += 1

yield row

if self._cancelled:
break
self._process_chunk(chunk)
if chunk.commit_row:
self.last_scanned_row_key = self._previous_row.row_key
self._counter += 1
yield self._previous_row

resp_last_key = response.last_scanned_row_key
if resp_last_key and resp_last_key > self.last_scanned_row_key:
self.last_scanned_row_key = resp_last_key

def _process_chunk(self, chunk):
if chunk.reset_row:
self._validate_chunk_reset_row(chunk)
self._row = None
self._cell = self._previous_cell = None
self._state = self.STATE_NEW_ROW
return

self._update_cell(chunk)

if self._row is None:
if (
self._previous_row is not None
and self._cell.row_key <= self._previous_row.row_key
):
raise InvalidChunk()
self._row = PartialRowData(self._cell.row_key)

if chunk.value_size == 0:
self._state = self.STATE_ROW_IN_PROGRESS
self._save_current_cell()
else:
self._state = self.STATE_CELL_IN_PROGRESS

if chunk.commit_row:
if chunk.value_size > 0:
raise InvalidChunk()

self._previous_row = self._row
self._row = None
self._previous_cell = None
self._state = self.STATE_NEW_ROW

def _update_cell(self, chunk):
if self._cell is None:
qualifier = None
if chunk.HasField("qualifier"):
qualifier = chunk.qualifier.value

family = None
if chunk.HasField("family_name"):
family = chunk.family_name.value

self._cell = PartialCellData(
chunk.row_key,
family,
qualifier,
chunk.timestamp_micros,
chunk.labels,
chunk.value,
)
self._copy_from_previous(self._cell)
self._validate_cell_data_new_cell()
else:
self._cell.append_value(chunk.value)

def _validate_cell_data_new_cell(self):
cell = self._cell
if not cell.row_key or not cell.family_name or cell.qualifier is None:
raise InvalidChunk()

prev = self._previous_cell
if prev and prev.row_key != cell.row_key:
raise InvalidChunk()

def _validate_chunk_reset_row(self, chunk):
# No reset for new row
_raise_if(self._state == self.STATE_NEW_ROW)

# No reset with other keys
_raise_if(chunk.row_key)
_raise_if(chunk.HasField("family_name"))
_raise_if(chunk.HasField("qualifier"))
_raise_if(chunk.timestamp_micros)
_raise_if(chunk.labels)
_raise_if(chunk.value_size)
_raise_if(chunk.value)
_raise_if(chunk.commit_row)

def _save_current_cell(self):
"""Helper for :meth:`consume_next`."""
row, cell = self._row, self._cell
family = row._cells.setdefault(cell.family_name, {})
qualified = family.setdefault(cell.qualifier, [])
complete = Cell.from_pb(cell)
qualified.append(complete)
self._cell, self._previous_cell = None, cell

def _copy_from_previous(self, cell):
"""Helper for :meth:`consume_next`."""
previous = self._previous_cell
if previous is not None:
if not cell.row_key:
cell.row_key = previous.row_key
if not cell.family_name:
cell.family_name = previous.family_name
# NOTE: ``cell.qualifier`` **can** be empty string.
if cell.qualifier is None:
cell.qualifier = previous.qualifier
# The last response might not have generated any rows, but it
# could've updated last_scanned_row_key
self.last_scanned_row_key = self._row_merger.last_seen_row_key


class _ReadRowsRequestManager(object):
Expand Down Expand Up @@ -494,9 +376,3 @@ def _start_key_set(row_range):
def _end_key_set(row_range):
"""Helper for :meth:`_filter_row_ranges`"""
return row_range.end_key_open or row_range.end_key_closed


def _raise_if(predicate, *args):
"""Helper for validation methods."""
if predicate:
raise InvalidChunk(*args)

0 comments on commit c71ec70

Please sign in to comment.