From 1ea5c89838f455c5608e639f501ba2e1de53faae Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Thu, 26 Feb 2026 17:06:49 +0000 Subject: [PATCH 1/6] feat: Rerouted ReadRows to data client --- google/cloud/bigtable/row.py | 18 + google/cloud/bigtable/row_data.py | 299 +------ google/cloud/bigtable/row_merger.py | 250 ------ google/cloud/bigtable/row_set.py | 13 - google/cloud/bigtable/table.py | 75 +- tests/system/v2_client/test_data_api.py | 38 +- tests/unit/v2_client/test_row_data.py | 1056 ++++------------------- tests/unit/v2_client/test_row_merger.py | 175 +--- tests/unit/v2_client/test_row_set.py | 22 - tests/unit/v2_client/test_table.py | 549 ++---------- 10 files changed, 371 insertions(+), 2124 deletions(-) delete mode 100644 google/cloud/bigtable/row_merger.py diff --git a/google/cloud/bigtable/row.py b/google/cloud/bigtable/row.py index 46bfd94eb..273b037a1 100644 --- a/google/cloud/bigtable/row.py +++ b/google/cloud/bigtable/row.py @@ -1009,6 +1009,16 @@ def __init__(self, row_key): self._row_key = row_key self._cells = {} + @classmethod + def _from_data_client_row(cls, row): + partial_row_data = cls(row.row_key) + for column_family in row._index: + columns = {} + for column, items in row._index[column_family].items(): + columns[column] = [Cell._from_data_client_cell(item) for item in items] + partial_row_data._cells[column_family] = columns + return partial_row_data + def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented @@ -1212,6 +1222,14 @@ def from_pb(cls, cell_pb): else: return cls(cell_pb.value, cell_pb.timestamp_micros) + @classmethod + def _from_data_client_cell(cls, cell): + return cls( + cell.value, + cell.timestamp_micros, + cell.labels, + ) + @property def timestamp(self): return _datetime_from_microseconds(self.timestamp_micros) diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index b333d9c6a..7bdd17304 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -15,17 +15,9 @@ """Container for Google Cloud Bigtable Cells and Streaming Row Contents.""" -import copy - -import grpc # type: ignore -import warnings from google.api_core import exceptions from google.api_core import retry -from google.cloud._helpers import _to_bytes # type: ignore -from google.cloud.bigtable.row_merger import _RowMerger, _State -from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2 -from google.cloud.bigtable_v2.types import data as data_v2_pb2 from google.cloud.bigtable.row import Cell, InvalidChunk, PartialRowData @@ -60,36 +52,7 @@ class InvalidRetryRequest(RuntimeError): """Exception raised when retry request is invalid.""" -RETRYABLE_INTERNAL_ERROR_MESSAGES = ( - "rst_stream", - "rst stream", - "received unexpected eos on data frame from server", -) -"""Internal error messages that can be retried during read row and mutation.""" - - -def _retriable_internal_server_error(exc): - """ - Return True if the internal server error is retriable. - """ - return isinstance(exc, exceptions.InternalServerError) and any( - retryable_message in exc.message.lower() - for retryable_message in RETRYABLE_INTERNAL_ERROR_MESSAGES - ) - - -def _retry_read_rows_exception(exc): - """Return True if the exception is retriable for read row requests.""" - if isinstance(exc, grpc.RpcError): - exc = exceptions.from_grpc_error(exc) - - return _retriable_internal_server_error(exc) or isinstance( - exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded) - ) - - DEFAULT_RETRY_READ_ROWS = retry.Retry( - predicate=_retry_read_rows_exception, initial=1.0, maximum=15.0, multiplier=2.0, @@ -108,94 +71,20 @@ class PartialRowsData(object): :type read_method: :class:`client._table_data_client.read_rows` :param read_method: ``ReadRows`` method. - :type request: :class:`data_messages_v2_pb2.ReadRowsRequest` - :param request: The ``ReadRowsRequest`` message used to create a - ReadRowsResponse iterator. If the iterator fails, a new - iterator is created, allowing the scan to continue from - the point just beyond the last successfully read row, - identified by self.last_scanned_row_key. The retry happens - inside of the Retry class, using a predicate for the - expected exceptions during iteration. - - :type retry: :class:`~google.api_core.retry.Retry` - :param retry: (Optional) Retry delay and deadline arguments. To override, - the default value :attr:`DEFAULT_RETRY_READ_ROWS` can be - used and modified with the - :meth:`~google.api_core.retry.Retry.with_delay` method - or the - :meth:`~google.api_core.retry.Retry.with_deadline` method. + :type generator: :class:`Iterable[Row]` + :param generator: The `Row` iterator from :meth:`Table.read_rows` """ - NEW_ROW = "New row" # No cells yet complete for row - ROW_IN_PROGRESS = "Row in progress" # Some cells complete for row - CELL_IN_PROGRESS = "Cell in progress" # Incomplete cell for row - - STATE_NEW_ROW = 1 - STATE_ROW_IN_PROGRESS = 2 - STATE_CELL_IN_PROGRESS = 3 - - read_states = { - STATE_NEW_ROW: NEW_ROW, - STATE_ROW_IN_PROGRESS: ROW_IN_PROGRESS, - STATE_CELL_IN_PROGRESS: CELL_IN_PROGRESS, - } - - def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS): - # Counter for rows returned to the user - self._counter = 0 - self._row_merger = _RowMerger() - - # May be cached from previous response - self.last_scanned_row_key = None - self.read_method = read_method - self.request = request - self.retry = retry - - # The `timeout` parameter must be somewhat greater than the value - # contained in `self.retry`, in order to avoid race-like condition and - # allow registering the first deadline error before invoking the retry. - # Otherwise there is a risk of entering an infinite loop that resets - # the timeout counter just before it being triggered. The increment - # by 1 second here is customary but should not be much less than that. - self.response_iterator = read_method( - request, timeout=self.retry._deadline + 1, retry=self.retry - ) - + def __init__(self, generator): + self._generator = generator self.rows = {} # Flag to stop iteration, for any reason not related to self.retry() self._cancelled = False - @property - def state(self): # pragma: NO COVER - """ - DEPRECATED: this property is deprecated and will be removed in the - future. - """ - warnings.warn( - "`PartialRowsData#state()` is deprecated and will be removed in the future", - DeprecationWarning, - stacklevel=2, - ) - - # Best effort: try to map internal RowMerger states to old strings for - # backwards compatibility - internal_state = self._row_merger.state - if internal_state == _State.ROW_START: - return self.NEW_ROW - # note: _State.CELL_START, _State.CELL_COMPLETE are transient states - # and will not be visible in between chunks - elif internal_state == _State.CELL_IN_PROGRESS: - return self.CELL_IN_PROGRESS - elif internal_state == _State.ROW_COMPLETE: - return self.NEW_ROW - else: - raise RuntimeError("unexpected internal state: " + self._) - def cancel(self): """Cancels the iterator, closing the stream.""" self._cancelled = True - self.response_iterator.cancel() def consume_all(self, max_loops=None): """Consume the streamed responses until there are no more. @@ -212,169 +101,23 @@ class as a generator instead. for row in self: self.rows[row.row_key] = row - def _create_retry_request(self): - """Helper for :meth:`__iter__`.""" - req_manager = _ReadRowsRequestManager( - self.request, self.last_scanned_row_key, self._counter - ) - return req_manager.build_updated_request() - - def _on_error(self, exc): - """Helper for :meth:`__iter__`.""" - # restart the read scan from AFTER the last successfully read row - retry_request = self.request - if self.last_scanned_row_key: - retry_request = self._create_retry_request() - - self._row_merger = _RowMerger(self._row_merger.last_seen_row_key) - self.response_iterator = self.read_method(retry_request, retry=self.retry) - - def _read_next(self): - """Helper for :meth:`__iter__`.""" - return next(self.response_iterator) - - def _read_next_response(self): - """Helper for :meth:`__iter__`.""" - resp_protoplus = self.retry(self._read_next, on_error=self._on_error)() - # unwrap the underlying protobuf, there is a significant amount of - # overhead that protoplus imposes for very little gain. The protos - # are not user visible, so we just use the raw protos for merging. - return data_messages_v2_pb2.ReadRowsResponse.pb(resp_protoplus) - def __iter__(self): - """Consume the ``ReadRowsResponse`` s from the stream. - Read the rows and yield each to the reader - - Parse the response and its chunks into a new/existing row in - :attr:`_rows`. Rows are returned in order by row key. + """Consume the ``Row`` s from the stream. + Convert them to ``PartialRowData`` and yield each to the reader. """ - while not self._cancelled: - try: - response = self._read_next_response() - except StopIteration: - self._row_merger.finalize() - break - except InvalidRetryRequest: - self._cancelled = True - break - - for row in self._row_merger.process_chunks(response): - self.last_scanned_row_key = self._row_merger.last_seen_row_key - self._counter += 1 - - yield row - + try: + for row in self._generator: if self._cancelled: - break - # The last response might not have generated any rows, but it - # could've updated last_scanned_row_key - self.last_scanned_row_key = self._row_merger.last_seen_row_key - - -class _ReadRowsRequestManager(object): - """Update the ReadRowsRequest message in case of failures by - filtering the already read keys. - - :type message: class:`data_messages_v2_pb2.ReadRowsRequest` - :param message: Original ReadRowsRequest containing all of the parameters - of API call - - :type last_scanned_key: bytes - :param last_scanned_key: last successfully scanned key - - :type rows_read_so_far: int - :param rows_read_so_far: total no of rows successfully read so far. - this will be used for updating rows_limit - - """ - - def __init__(self, message, last_scanned_key, rows_read_so_far): - self.message = message - self.last_scanned_key = last_scanned_key - self.rows_read_so_far = rows_read_so_far - - def build_updated_request(self): - """Updates the given message request as per last scanned key""" - - resume_request = data_messages_v2_pb2.ReadRowsRequest() - data_messages_v2_pb2.ReadRowsRequest.copy_from(resume_request, self.message) - - if self.message.rows_limit != 0: - row_limit_remaining = self.message.rows_limit - self.rows_read_so_far - if row_limit_remaining > 0: - resume_request.rows_limit = row_limit_remaining - else: - raise InvalidRetryRequest - - # if neither RowSet.row_keys nor RowSet.row_ranges currently exist, - # add row_range that starts with last_scanned_key as start_key_open - # to request only rows that have not been returned yet - if "rows" not in self.message: - row_range = data_v2_pb2.RowRange(start_key_open=self.last_scanned_key) - resume_request.rows = data_v2_pb2.RowSet(row_ranges=[row_range]) - else: - row_keys = self._filter_rows_keys() - row_ranges = self._filter_row_ranges() - - if len(row_keys) == 0 and len(row_ranges) == 0: - # Avoid sending empty row_keys and row_ranges - # if that was not the intention - raise InvalidRetryRequest - - resume_request.rows = data_v2_pb2.RowSet( - row_keys=row_keys, row_ranges=row_ranges - ) - return resume_request - - def _filter_rows_keys(self): - """Helper for :meth:`build_updated_request`""" - return [ - row_key - for row_key in self.message.rows.row_keys - if row_key > self.last_scanned_key - ] - - def _filter_row_ranges(self): - """Helper for :meth:`build_updated_request`""" - new_row_ranges = [] - - for row_range in self.message.rows.row_ranges: - # if current end_key (open or closed) is set, return its value, - # if not, set to empty string (''). - # NOTE: Empty string in end_key means "end of table" - end_key = self._end_key_set(row_range) - # if end_key is already read, skip to the next row_range - if end_key and self._key_already_read(end_key): - continue - - # if current start_key (open or closed) is set, return its value, - # if not, then set to empty string ('') - # NOTE: Empty string in start_key means "beginning of table" - start_key = self._start_key_set(row_range) - - # if start_key was already read or doesn't exist, - # create a row_range with last_scanned_key as start_key_open - # to be passed to retry request - retry_row_range = row_range - if self._key_already_read(start_key): - retry_row_range = copy.deepcopy(row_range) - retry_row_range.start_key_closed = _to_bytes("") - retry_row_range.start_key_open = self.last_scanned_key - - new_row_ranges.append(retry_row_range) - - return new_row_ranges - - def _key_already_read(self, key): - """Helper for :meth:`_filter_row_ranges`""" - return key <= self.last_scanned_key - - @staticmethod - def _start_key_set(row_range): - """Helper for :meth:`_filter_row_ranges`""" - return row_range.start_key_open or row_range.start_key_closed - - @staticmethod - def _end_key_set(row_range): - """Helper for :meth:`_filter_row_ranges`""" - return row_range.end_key_open or row_range.end_key_closed + return + else: + yield PartialRowData._from_data_client_row(row) + + # Any exception from the generator should cancel the iterator. A + # timeout, defined by catching a DeadlineExceeded, should be reraised + # as a RetryError instead. + except exceptions.DeadlineExceeded as e: + self.cancel() + raise exceptions.RetryError(e.message, e.__cause__) + except Exception as e: + self.cancel() + raise e diff --git a/google/cloud/bigtable/row_merger.py b/google/cloud/bigtable/row_merger.py deleted file mode 100644 index 515b91df7..000000000 --- a/google/cloud/bigtable/row_merger.py +++ /dev/null @@ -1,250 +0,0 @@ -from enum import Enum -from collections import OrderedDict -from google.cloud.bigtable.row import Cell, PartialRowData, InvalidChunk - -_MISSING_COLUMN_FAMILY = "Column family {} is not among the cells stored in this row." -_MISSING_COLUMN = ( - "Column {} is not among the cells stored in this row in the column family {}." -) -_MISSING_INDEX = ( - "Index {!r} is not valid for the cells stored in this row for column {} " - "in the column family {}. There are {} such cells." -) - - -class _State(Enum): - ROW_START = "ROW_START" - CELL_START = "CELL_START" - CELL_IN_PROGRESS = "CELL_IN_PROGRESS" - CELL_COMPLETE = "CELL_COMPLETE" - ROW_COMPLETE = "ROW_COMPLETE" - - -class _PartialRow(object): - __slots__ = [ - "row_key", - "cells", - "last_family", - "last_family_cells", - "last_qualifier", - "last_qualifier_cells", - "cell", - ] - - def __init__(self, row_key): - self.row_key = row_key - self.cells = OrderedDict() - - self.last_family = None - self.last_family_cells = OrderedDict() - self.last_qualifier = None - self.last_qualifier_cells = [] - - self.cell = None - - -class _PartialCell(object): - __slots__ = ["family", "qualifier", "timestamp", "labels", "value", "value_index"] - - def __init__(self): - self.family = None - self.qualifier = None - self.timestamp = None - self.labels = None - self.value = None - self.value_index = 0 - - -class _RowMerger(object): - """ - State machine to merge chunks from a response stream into logical rows. - - The implementation is a fairly linear state machine that is implemented as - a method for every state in the _State enum. In general the states flow - from top to bottom with some repetition. Each state handler will do some - sanity checks, update in progress data and set the next state. - - There can be multiple state transitions for each chunk, i.e. a single chunk - row will flow from ROW_START -> CELL_START -> CELL_COMPLETE -> ROW_COMPLETE - in a single iteration. - """ - - __slots__ = ["state", "last_seen_row_key", "row"] - - def __init__(self, last_seen_row=b""): - self.last_seen_row_key = last_seen_row - self.state = _State.ROW_START - self.row = None - - def process_chunks(self, response): - """ - Process the chunks in the given response and yield logical rows. - This class will maintain state across multiple response protos. - """ - if response.last_scanned_row_key: - if self.last_seen_row_key >= response.last_scanned_row_key: - raise InvalidChunk("Last scanned row key is out of order") - self.last_seen_row_key = response.last_scanned_row_key - - for chunk in response.chunks: - if chunk.reset_row: - self._handle_reset(chunk) - continue - - if self.state == _State.ROW_START: - self._handle_row_start(chunk) - - if self.state == _State.CELL_START: - self._handle_cell_start(chunk) - - if self.state == _State.CELL_IN_PROGRESS: - self._handle_cell_in_progress(chunk) - - if self.state == _State.CELL_COMPLETE: - self._handle_cell_complete(chunk) - - if self.state == _State.ROW_COMPLETE: - yield self._handle_row_complete(chunk) - elif chunk.commit_row: - raise InvalidChunk( - f"Chunk tried to commit row in wrong state (${self.state})" - ) - - def _handle_reset(self, chunk): - if self.state == _State.ROW_START: - raise InvalidChunk("Bare reset") - if chunk.row_key: - raise InvalidChunk("Reset chunk has a row key") - if chunk.HasField("family_name"): - raise InvalidChunk("Reset chunk has family_name") - if chunk.HasField("qualifier"): - raise InvalidChunk("Reset chunk has qualifier") - if chunk.timestamp_micros: - raise InvalidChunk("Reset chunk has a timestamp") - if chunk.labels: - raise InvalidChunk("Reset chunk has labels") - if chunk.value: - raise InvalidChunk("Reset chunk has a value") - - self.state = _State.ROW_START - self.row = None - - def _handle_row_start(self, chunk): - if not chunk.row_key: - raise InvalidChunk("New row is missing a row key") - if self.last_seen_row_key and self.last_seen_row_key >= chunk.row_key: - raise InvalidChunk("Out of order row keys") - - self.row = _PartialRow(chunk.row_key) - self.state = _State.CELL_START - - def _handle_cell_start(self, chunk): - # Ensure that all chunks after the first one either are missing a row - # key or the row is the same - if self.row.cells and chunk.row_key and chunk.row_key != self.row.row_key: - raise InvalidChunk("row key changed mid row") - - if not self.row.cell: - self.row.cell = _PartialCell() - - # Cells can inherit family/qualifier from previous cells - # However if the family changes, then qualifier must be specified as well - if chunk.HasField("family_name"): - self.row.cell.family = chunk.family_name.value - self.row.cell.qualifier = None - if not self.row.cell.family: - raise InvalidChunk("missing family for a new cell") - - if chunk.HasField("qualifier"): - self.row.cell.qualifier = chunk.qualifier.value - if self.row.cell.qualifier is None: - raise InvalidChunk("missing qualifier for a new cell") - - self.row.cell.timestamp = chunk.timestamp_micros - self.row.cell.labels = chunk.labels - - if chunk.value_size > 0: - # explicitly avoid pre-allocation as it seems that bytearray - # concatenation performs better than slice copies. - self.row.cell.value = bytearray() - self.state = _State.CELL_IN_PROGRESS - else: - self.row.cell.value = chunk.value - self.state = _State.CELL_COMPLETE - - def _handle_cell_in_progress(self, chunk): - # if this isn't the first cell chunk, make sure that everything except - # the value stayed constant. - if self.row.cell.value_index > 0: - if chunk.row_key: - raise InvalidChunk("found row key mid cell") - if chunk.HasField("family_name"): - raise InvalidChunk("In progress cell had a family name") - if chunk.HasField("qualifier"): - raise InvalidChunk("In progress cell had a qualifier") - if chunk.timestamp_micros: - raise InvalidChunk("In progress cell had a timestamp") - if chunk.labels: - raise InvalidChunk("In progress cell had labels") - - self.row.cell.value += chunk.value - self.row.cell.value_index += len(chunk.value) - - if chunk.value_size > 0: - self.state = _State.CELL_IN_PROGRESS - else: - self.row.cell.value = bytes(self.row.cell.value) - self.state = _State.CELL_COMPLETE - - def _handle_cell_complete(self, chunk): - # since we are guaranteed that all family & qualifier cells are - # contiguous, we can optimize away the dict lookup by caching the last - # family/qualifier and simply comparing and appending - family_changed = False - if self.row.last_family != self.row.cell.family: - family_changed = True - self.row.last_family = self.row.cell.family - self.row.cells[ - self.row.cell.family - ] = self.row.last_family_cells = OrderedDict() - - if family_changed or self.row.last_qualifier != self.row.cell.qualifier: - self.row.last_qualifier = self.row.cell.qualifier - self.row.last_family_cells[ - self.row.cell.qualifier - ] = self.row.last_qualifier_cells = [] - - self.row.last_qualifier_cells.append( - Cell( - self.row.cell.value, - self.row.cell.timestamp, - self.row.cell.labels, - ) - ) - - self.row.cell.timestamp = 0 - self.row.cell.value = None - self.row.cell.value_index = 0 - - if not chunk.commit_row: - self.state = _State.CELL_START - else: - self.state = _State.ROW_COMPLETE - - def _handle_row_complete(self, chunk): - new_row = PartialRowData(self.row.row_key) - new_row._cells = self.row.cells - - self.last_seen_row_key = new_row.row_key - self.row = None - self.state = _State.ROW_START - - return new_row - - def finalize(self): - """ - Must be called at the end of the stream to ensure there are no unmerged - rows. - """ - if self.row or self.state != _State.ROW_START: - raise ValueError("The row remains partial / is not committed.") diff --git a/google/cloud/bigtable/row_set.py b/google/cloud/bigtable/row_set.py index ef6c711ab..0d5ffac19 100644 --- a/google/cloud/bigtable/row_set.py +++ b/google/cloud/bigtable/row_set.py @@ -15,7 +15,6 @@ """User-friendly container for Google Cloud Bigtable RowSet """ -from google.cloud._helpers import _to_bytes from google.cloud.bigtable.data.read_rows_query import ( RowRange as BaseRowRange, ReadRowsQuery, @@ -130,18 +129,6 @@ def add_row_range_with_prefix(self, row_key_prefix): row_key_prefix.encode("utf-8"), end_key.encode("utf-8") ) - def _update_message_request(self, message): - """Add row keys and row range to given request message - - :type message: class:`data_messages_v2_pb2.ReadRowsRequest` - :param message: The ``ReadRowsRequest`` protobuf - """ - for each in self._read_rows_query.row_keys: - message.rows.row_keys._pb.append(_to_bytes(each)) - - for each in self._read_rows_query.row_ranges: - message.rows.row_ranges.append(each._to_pb()) - class RowRange(_MappableAttributesMixin, BaseRowRange): """Convenience wrapper of google.bigtable.v2.RowRange diff --git a/google/cloud/bigtable/table.py b/google/cloud/bigtable/table.py index 8136c3f9a..465383b60 100644 --- a/google/cloud/bigtable/table.py +++ b/google/cloud/bigtable/table.py @@ -35,6 +35,7 @@ RetryExceptionGroup, MutationsExceptionGroup, ) +from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery from google.cloud.bigtable.data.mutations import RowMutationEntry from google.cloud.bigtable.batcher import MutationsBatcher from google.cloud.bigtable.batcher import FLUSH_COUNT, MAX_MUTATION_SIZE @@ -43,12 +44,11 @@ from google.cloud.bigtable.row import AppendRow from google.cloud.bigtable.row import ConditionalRow from google.cloud.bigtable.row import DirectRow +from google.cloud.bigtable.row import PartialRowData from google.cloud.bigtable.row_data import PartialRowsData from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS -from google.cloud.bigtable.row_set import RowSet from google.cloud.bigtable.row_set import RowRange from google.cloud.bigtable import enums -from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2 from google.cloud.bigtable.admin import BigtableTableAdminClient from google.cloud.bigtable.admin.types import table as admin_messages_v2_pb2 from google.cloud.bigtable.admin.types import ( @@ -570,18 +570,19 @@ def read_row(self, row_key, filter_=None, retry=DEFAULT_RETRY_READ_ROWS): :rtype: :class:`.PartialRowData`, :data:`NoneType ` :returns: The contents of the row if any chunks were returned in the response, otherwise :data:`None`. - :raises: :class:`ValueError ` if a commit row - chunk is never encountered. """ - row_set = RowSet() - row_set.add_row_key(row_key) - result_iter = iter( - self.read_rows(filter_=filter_, row_set=row_set, retry=retry) + attempt_timeout = retry.deadline if retry.deadline else TABLE_DEFAULT.READ_ROWS + row = self._table_impl.read_row( + row_key, + row_filter=filter_, + operation_timeout=TABLE_DEFAULT.READ_ROWS, + attempt_timeout=attempt_timeout, + retryable_errors=TABLE_DEFAULT.READ_ROWS, ) - row = next(result_iter, None) - if next(result_iter, None) is not None: - raise ValueError("More than one row was returned.") - return row + if row is None: + return None + + return PartialRowData._from_data_client_row(row) def read_rows( self, @@ -642,18 +643,22 @@ def read_rows( :returns: A :class:`.PartialRowsData` a generator for consuming the streamed results. """ - request_pb = _create_row_request( - self.name, + attempt_timeout = retry.deadline if retry.deadline else TABLE_DEFAULT.READ_ROWS + query = _create_row_request( start_key=start_key, end_key=end_key, filter_=filter_, limit=limit, end_inclusive=end_inclusive, - app_profile_id=self._app_profile_id, row_set=row_set, ) - data_client = self._instance._client.table_data_client - return PartialRowsData(data_client.read_rows, request_pb, retry) + generator = self._table_impl.read_rows_stream( + query, + operation_timeout=TABLE_DEFAULT.READ_ROWS, + attempt_timeout=attempt_timeout, + retryable_errors=TABLE_DEFAULT.READ_ROWS, + ) + return PartialRowsData(generator) def yield_rows(self, **kwargs): """Read rows from this table. @@ -1212,13 +1217,11 @@ def __ne__(self, other): def _create_row_request( - table_name, start_key=None, end_key=None, filter_=None, limit=None, end_inclusive=False, - app_profile_id=None, row_set=None, ): """Creates a request to read rows in a table. @@ -1249,36 +1252,32 @@ def _create_row_request( :param end_inclusive: (Optional) Whether the ``end_key`` should be considered inclusive. The default is False (exclusive). - :type: app_profile_id: str - :param app_profile_id: (Optional) The unique name of the AppProfile. - :type row_set: :class:`.RowSet` :param row_set: (Optional) The row set containing multiple row keys and row_ranges. - :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` - :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. + :rtype: :class:`ReadRowsQuery` + :returns: The `ReadRowsQuery` query object corresponding to the inputs. :raises: :class:`ValueError ` if both - ``row_set`` and one of ``start_key`` or ``end_key`` are set + ``row_set`` and one of ``start_key`` or ``end_key`` are set, or if + ``end_key`` is not greater than ``start_key`` """ - request_kwargs = {"table_name": table_name} if (start_key is not None or end_key is not None) and row_set is not None: raise ValueError("Row range and row set cannot be " "set simultaneously") - if filter_ is not None: - request_kwargs["filter"] = filter_._to_pb() - if limit is not None: - request_kwargs["rows_limit"] = limit - if app_profile_id is not None: - request_kwargs["app_profile_id"] = app_profile_id - - message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) + query = ReadRowsQuery( + limit=limit, + row_filter=filter_, + ) if start_key is not None or end_key is not None: - row_set = RowSet() - row_set.add_row_range(RowRange(start_key, end_key, end_inclusive=end_inclusive)) + query.add_range(RowRange(start_key, end_key, end_inclusive=end_inclusive)) if row_set is not None: - row_set._update_message_request(message) + for row_key in row_set.row_keys: + query.add_key(row_key) + + for row_range in row_set.row_ranges: + query.add_range(row_range) - return message + return query diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index 12ce20b8c..17a606717 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -729,11 +729,12 @@ def test_table_read_rows_retry_unretriable_error_establishing_stream( error_injector = data_table_read_rows_retry_tests.error_injector error_injector.errors_to_inject = [ - error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False) + error_injector.make_exception(StatusCode.DATA_LOSS, fail_mid_stream=False) ] - with pytest.raises(exceptions.Aborted): - data_table_read_rows_retry_tests.read_rows() + rows_data = data_table_read_rows_retry_tests.read_rows() + with pytest.raises(exceptions.DataLoss): + rows_data.consume_all() def test_table_read_rows_retry_retriable_error_establishing_stream( @@ -794,25 +795,25 @@ def test_table_read_rows_retry_retriable_errors_mid_stream( def test_table_read_rows_retry_retriable_internal_errors_mid_stream( data_table_read_rows_retry_tests, ): - from google.cloud.bigtable.row_data import RETRYABLE_INTERNAL_ERROR_MESSAGES + from google.cloud.bigtable.data._helpers import _RETRYABLE_INTERNAL_ERROR_MESSAGES error_injector = data_table_read_rows_retry_tests.error_injector error_injector.errors_to_inject = [ error_injector.make_exception( StatusCode.INTERNAL, - message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + message=_RETRYABLE_INTERNAL_ERROR_MESSAGES[0], fail_mid_stream=True, successes_before_fail=2, ), error_injector.make_exception( StatusCode.INTERNAL, - message=RETRYABLE_INTERNAL_ERROR_MESSAGES[1], + message=_RETRYABLE_INTERNAL_ERROR_MESSAGES[1], fail_mid_stream=True, successes_before_fail=1, ), error_injector.make_exception( StatusCode.INTERNAL, - message=RETRYABLE_INTERNAL_ERROR_MESSAGES[2], + message=_RETRYABLE_INTERNAL_ERROR_MESSAGES[2], fail_mid_stream=True, successes_before_fail=0, ), @@ -855,12 +856,12 @@ def test_table_read_rows_retry_retriable_error_mid_stream_unretriable_error_rees error_injector.make_exception( StatusCode.UNAVAILABLE, fail_mid_stream=True, successes_before_fail=5 ), - error_injector.make_exception(StatusCode.ABORTED, fail_mid_stream=False), + error_injector.make_exception(StatusCode.DATA_LOSS, fail_mid_stream=False), ] rows_data = data_table_read_rows_retry_tests.read_rows() - with pytest.raises(exceptions.Aborted): + with pytest.raises(exceptions.DataLoss): rows_data.consume_all() @@ -892,27 +893,30 @@ def test_table_read_rows_retry_timeout_mid_stream( from google.api_core import exceptions from google.cloud.bigtable.row_data import ( DEFAULT_RETRY_READ_ROWS, - RETRYABLE_INTERNAL_ERROR_MESSAGES, ) + from google.cloud.bigtable.data._helpers import _RETRYABLE_INTERNAL_ERROR_MESSAGES error_injector = data_table_read_rows_retry_tests.error_injector error_injector.errors_to_inject = [ error_injector.make_exception( StatusCode.INTERNAL, - message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + message=_RETRYABLE_INTERNAL_ERROR_MESSAGES[0], fail_mid_stream=True, successes_before_fail=5, ), ] + [ error_injector.make_exception( StatusCode.INTERNAL, - message=RETRYABLE_INTERNAL_ERROR_MESSAGES[0], + message=_RETRYABLE_INTERNAL_ERROR_MESSAGES[0], fail_mid_stream=True, successes_before_fail=0, ), ] * 20 # Shorten the deadline so the timeout test is shorter. + data_table_read_rows_retry_tests._table_impl.default_read_rows_operation_timeout = ( + 10.0 + ) rows_data = data_table_read_rows_retry_tests.read_rows( retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) ) @@ -940,10 +944,14 @@ def test_table_read_rows_retry_timeout_establishing_stream( ] * 20 # Shorten the deadline so the timeout test is shorter. + data_table_read_rows_retry_tests._table_impl.default_read_rows_operation_timeout = ( + 10.0 + ) + rows_data = data_table_read_rows_retry_tests.read_rows( + retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) + ) with pytest.raises(exceptions.RetryError): - data_table_read_rows_retry_tests.read_rows( - retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) - ) + rows_data.consume_all() def test_table_check_and_mutate_rows(data_table, rows_to_delete): diff --git a/tests/unit/v2_client/test_row_data.py b/tests/unit/v2_client/test_row_data.py index e2f13be5e..11e4b0d65 100644 --- a/tests/unit/v2_client/test_row_data.py +++ b/tests/unit/v2_client/test_row_data.py @@ -13,10 +13,9 @@ # limitations under the License. -import mock import pytest -from ._testing import _make_credentials +from google.cloud.bigtable.data.row import Row, Cell TIMESTAMP_MICROS = 18738724000 # Make sure millis granularity ROW_KEY = b"row-key" @@ -24,6 +23,44 @@ QUALIFIER = b"qualifier" VALUE = b"value" TABLE_NAME = "table_name" +ROWS = [ + Row( + key=ROW_KEY, + cells=[ + Cell( + value=VALUE, + row_key=ROW_KEY, + family=FAMILY_NAME, + qualifier=QUALIFIER, + timestamp_micros=TIMESTAMP_MICROS, + ) + ], + ), + Row( + key=ROW_KEY + b"2", + cells=[ + Cell( + value=VALUE, + row_key=ROW_KEY + b"2", + family=FAMILY_NAME, + qualifier=QUALIFIER, + timestamp_micros=TIMESTAMP_MICROS, + ) + ], + ), + Row( + key=ROW_KEY + b"3", + cells=[ + Cell( + value=VALUE, + row_key=ROW_KEY + b"3", + family=FAMILY_NAME, + qualifier=QUALIFIER, + timestamp_micros=TIMESTAMP_MICROS, + ) + ], + ), +] def _make_cell(*args, **kwargs): @@ -65,6 +102,45 @@ def test_cell_from_pb_with_labels(): _cell_from_pb_test_helper(labels) +def test_cell__from_data_client_cell(): + from google.cloud.bigtable.data.row import Cell as DataCell + from google.cloud.bigtable.row_data import Cell + + data_cell = DataCell( + value=VALUE, + row_key=ROW_KEY, + family=FAMILY_NAME, + qualifier=QUALIFIER, + timestamp_micros=TIMESTAMP_MICROS, + ) + + cell = Cell._from_data_client_cell(data_cell) + + assert cell.value == VALUE + assert cell.timestamp_micros == TIMESTAMP_MICROS + assert cell.labels == [] + + +def test_cell__from_data_client_cell_with_labels(): + from google.cloud.bigtable.data.row import Cell as DataCell + from google.cloud.bigtable.row_data import Cell + + data_cell = DataCell( + value=VALUE, + row_key=ROW_KEY, + family=FAMILY_NAME, + qualifier=QUALIFIER, + timestamp_micros=TIMESTAMP_MICROS, + labels=["label1", "label2"], + ) + + cell = Cell._from_data_client_cell(data_cell) + + assert cell.value == VALUE + assert cell.timestamp_micros == TIMESTAMP_MICROS + assert cell.labels == ["label1", "label2"] + + def test_cell_constructor(): value = object() cell = _make_cell(value, TIMESTAMP_MICROS) @@ -112,6 +188,76 @@ def test_partial_row_data_constructor(): assert partial_row_data._cells == {} +def test_partial_row_data__from_data_client_row(): + from google.cloud.bigtable.data.row import Row as DataRow + from google.cloud.bigtable.data.row import Cell as DataCell + from google.cloud.bigtable.row_data import PartialRowData + + cells = [ + DataCell( + value=b"1", + row_key=ROW_KEY, + family="family1", + qualifier=b"qual1", + timestamp_micros=1, + ), + DataCell( + value=b"2", + row_key=ROW_KEY, + family="family1", + qualifier=b"qual1", + timestamp_micros=2, + ), + DataCell( + value=b"3", + row_key=ROW_KEY, + family="family1", + qualifier=b"qual2", + timestamp_micros=3, + ), + DataCell( + value=b"4", + row_key=ROW_KEY, + family="family2", + qualifier=b"qual1", + timestamp_micros=4, + ), + DataCell( + value=b"5", + row_key=ROW_KEY, + family="family2", + qualifier=b"qual2", + timestamp_micros=5, + ), + ] + + row = DataRow(ROW_KEY, cells) + partial_row_data = PartialRowData._from_data_client_row(row) + + expected_cells = { + "family1": { + b"qual1": [ + _make_cell(b"1", 1), + _make_cell(b"2", 2), + ], + b"qual2": [ + _make_cell(b"3", 3), + ], + }, + "family2": { + b"qual1": [ + _make_cell(b"4", 4), + ], + b"qual2": [ + _make_cell(b"5", 5), + ], + }, + } + + assert partial_row_data._row_key == ROW_KEY + assert partial_row_data._cells == expected_cells + + def test_partial_row_data___eq__(): row_key = object() partial_row_data1 = _make_partial_row_data(row_key) @@ -286,82 +432,6 @@ def trailing_metadata(self): return TestingException(exception) -def test__retry_read_rows_exception_miss(): - from google.api_core.exceptions import Conflict - from google.cloud.bigtable.row_data import _retry_read_rows_exception - - exception = Conflict("testing") - assert not _retry_read_rows_exception(exception) - - -def test__retry_read_rows_exception_service_unavailable(): - from google.api_core.exceptions import ServiceUnavailable - from google.cloud.bigtable.row_data import _retry_read_rows_exception - - exception = ServiceUnavailable("testing") - assert _retry_read_rows_exception(exception) - - -def test__retry_read_rows_exception_deadline_exceeded(): - from google.api_core.exceptions import DeadlineExceeded - from google.cloud.bigtable.row_data import _retry_read_rows_exception - - exception = DeadlineExceeded("testing") - assert _retry_read_rows_exception(exception) - - -def test__retry_read_rows_exception_internal_server_not_retriable(): - from google.api_core.exceptions import InternalServerError - from google.cloud.bigtable.row_data import ( - _retry_read_rows_exception, - RETRYABLE_INTERNAL_ERROR_MESSAGES, - ) - - err_message = "500 Error" - exception = InternalServerError(err_message) - assert err_message not in RETRYABLE_INTERNAL_ERROR_MESSAGES - assert not _retry_read_rows_exception(exception) - - -def test__retry_read_rows_exception_internal_server_retriable(): - from google.api_core.exceptions import InternalServerError - from google.cloud.bigtable.row_data import ( - _retry_read_rows_exception, - RETRYABLE_INTERNAL_ERROR_MESSAGES, - ) - - for err_message in RETRYABLE_INTERNAL_ERROR_MESSAGES: - exception = InternalServerError(err_message) - assert _retry_read_rows_exception(exception) - - -def test__retry_read_rows_exception_miss_wrapped_in_grpc(): - from google.api_core.exceptions import Conflict - from google.cloud.bigtable.row_data import _retry_read_rows_exception - - wrapped = Conflict("testing") - exception = _make_grpc_call_error(wrapped) - assert not _retry_read_rows_exception(exception) - - -def test__retry_read_rows_exception_service_unavailable_wrapped_in_grpc(): - from google.api_core.exceptions import ServiceUnavailable - from google.cloud.bigtable.row_data import _retry_read_rows_exception - - wrapped = ServiceUnavailable("testing") - exception = _make_grpc_call_error(wrapped) - assert _retry_read_rows_exception(exception) - - -def test__retry_read_rows_exception_deadline_exceeded_wrapped_in_grpc(): - from google.api_core.exceptions import DeadlineExceeded - from google.cloud.bigtable.row_data import _retry_read_rows_exception - - wrapped = DeadlineExceeded("testing") - exception = _make_grpc_call_error(wrapped) - assert _retry_read_rows_exception(exception) - - def test_partial_cell_data(): from google.cloud.bigtable.row_data import PartialCellData @@ -396,55 +466,12 @@ def _partial_rows_data_consume_all(yrd): return [row.row_key for row in yrd] -def _make_client(*args, **kwargs): - from google.cloud.bigtable.client import Client - - return Client(*args, **kwargs) - - -def test_partial_rows_data_constructor(): - from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS - - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - partial_rows_data = _make_partial_rows_data(client._data_stub.ReadRows, request) - assert partial_rows_data.request is request - assert partial_rows_data.rows == {} - assert partial_rows_data.retry == DEFAULT_RETRY_READ_ROWS +def _make_generator(rows): + return (row for row in rows) def test_partial_rows_data_consume_all(): - resp = _ReadRowsResponseV2( - [ - _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ), - _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY + b"2", - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ), - ] - ) - - call_count = 0 - iterator = _MockCancellableIterator(resp) - - def fake_read(*args, **kwargs): - nonlocal call_count - call_count += 1 - return iterator - - partial_rows_data = _make_partial_rows_data(fake_read, None) + partial_rows_data = _make_partial_rows_data(_make_generator(ROWS)) partial_rows_data.consume_all() row1 = _make_partial_row_data(ROW_KEY) @@ -455,771 +482,50 @@ def fake_read(*args, **kwargs): row2._cells[FAMILY_NAME] = { QUALIFIER: [_make_cell(value=VALUE, timestamp_micros=TIMESTAMP_MICROS)] } - - assert partial_rows_data.rows == {row1.row_key: row1, row2.row_key: row2} - - -def test_partial_rows_data_constructor_with_retry(): - from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS - - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - retry = DEFAULT_RETRY_READ_ROWS - partial_rows_data = _make_partial_rows_data( - client._data_stub.ReadRows, request, retry - ) - partial_rows_data.read_method.assert_called_once_with( - request, - timeout=DEFAULT_RETRY_READ_ROWS.deadline + 1, - retry=DEFAULT_RETRY_READ_ROWS, - ) - assert partial_rows_data.request is request - assert partial_rows_data.rows == {} - assert partial_rows_data.retry == retry - - -def test_partial_rows_data___eq__(): - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - partial_rows_data1 = _make_partial_rows_data(client._data_stub.ReadRows, request) - partial_rows_data2 = _make_partial_rows_data(client._data_stub.ReadRows, request) - assert partial_rows_data1.rows == partial_rows_data2.rows - - -def test_partial_rows_data___eq__type_differ(): - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - partial_rows_data1 = _make_partial_rows_data(client._data_stub.ReadRows, request) - partial_rows_data2 = object() - assert not (partial_rows_data1 == partial_rows_data2) - - -def test_partial_rows_data___ne__same_value(): - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - partial_rows_data1 = _make_partial_rows_data(client._data_stub.ReadRows, request) - partial_rows_data2 = _make_partial_rows_data(client._data_stub.ReadRows, request) - assert partial_rows_data1 != partial_rows_data2 - - -def test_partial_rows_data___ne__(): - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - partial_rows_data1 = _make_partial_rows_data(client._data_stub.ReadRows, request) - partial_rows_data2 = _make_partial_rows_data(client._data_stub.ReadRows, request) - assert partial_rows_data1 != partial_rows_data2 - - -def test_partial_rows_data_rows_getter(): - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - partial_rows_data = _make_partial_rows_data(client._data_stub.ReadRows, request) - partial_rows_data.rows = value = object() - assert partial_rows_data.rows is value - - -def test_partial_rows_data_state_start(): - client = _Client() - iterator = _MockCancellableIterator() - client._data_stub = mock.MagicMock() - client._data_stub.ReadRows.side_effect = [iterator] - request = object() - yrd = _make_partial_rows_data(client._data_stub.ReadRows, request) - assert yrd.state == yrd.NEW_ROW - - -def test_partial_rows_data_state_new_row_w_row(): - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - - chunk = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - chunks = [chunk] - - response = _ReadRowsResponseV2(chunks) - iterator = _MockCancellableIterator(response) - - data_api = mock.create_autospec(BigtableClient) - - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - client._table_data_client = data_api - request = object() - - yrd = _make_partial_rows_data(client._table_data_client.read_rows, request) - assert yrd.retry._deadline == 60.0 - - yrd.response_iterator = iterator - rows = [row for row in yrd] - - result = rows[0] - assert result.row_key == ROW_KEY - assert yrd._counter == 1 - assert yrd.state == yrd.NEW_ROW - - -def test_partial_rows_data_multiple_chunks(): - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - - chunk1 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=False, - ) - chunk2 = _ReadRowsResponseCellChunkPB( - qualifier=QUALIFIER + b"1", - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - chunks = [chunk1, chunk2] - - response = _ReadRowsResponseV2(chunks) - iterator = _MockCancellableIterator(response) - data_api = mock.create_autospec(BigtableClient) - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - client._table_data_client = data_api - request = object() - - yrd = _make_partial_rows_data(data_api.read_rows, request) - - yrd.response_iterator = iterator - rows = [row for row in yrd] - result = rows[0] - assert result.row_key == ROW_KEY - assert yrd._counter == 1 - assert yrd.state == yrd.NEW_ROW - - -def test_partial_rows_data_cancel(): - client = _Client() - response_iterator = _MockCancellableIterator() - client._data_stub = mock.MagicMock() - client._data_stub.ReadRows.side_effect = [response_iterator] - request = object() - yield_rows_data = _make_partial_rows_data(client._data_stub.ReadRows, request) - assert response_iterator.cancel_calls == 0 - yield_rows_data.cancel() - assert response_iterator.cancel_calls == 1 - assert list(yield_rows_data) == [] - - -def test_partial_rows_data_cancel_between_chunks(): - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - - chunk1 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - chunk2 = _ReadRowsResponseCellChunkPB( - qualifier=QUALIFIER + b"1", - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - chunks = [chunk1, chunk2] - response = _ReadRowsResponseV2(chunks) - response_iterator = _MockCancellableIterator(response) - - client = _Client() - data_api = mock.create_autospec(BigtableClient) - client._table_data_client = data_api - request = object() - yrd = _make_partial_rows_data(data_api.read_rows, request) - yrd.response_iterator = response_iterator - - rows = [] - for row in yrd: - yrd.cancel() - rows.append(row) - - assert response_iterator.cancel_calls == 1 - assert list(yrd) == [] - - -def test_partial_rows_data_valid_last_scanned_row_key_on_start(): - client = _Client() - response = _ReadRowsResponseV2([], last_scanned_row_key=b"2.AFTER") - iterator = _MockCancellableIterator(response) - client._data_stub = mock.MagicMock() - client._data_stub.read_rows.side_effect = [iterator] - request = object() - yrd = _make_partial_rows_data(client._data_stub.read_rows, request) - yrd.last_scanned_row_key = b"1.BEFORE" - _partial_rows_data_consume_all(yrd) - assert yrd.last_scanned_row_key == b"2.AFTER" - - -def test_partial_rows_data_invalid_empty_chunk(): - from google.cloud.bigtable.row_data import InvalidChunk - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - - client = _Client() - chunks = _generate_cell_chunks([""]) - response = _ReadRowsResponseV2(chunks) - iterator = _MockCancellableIterator(response) - client._data_stub = mock.create_autospec(BigtableClient) - client._data_stub.read_rows.side_effect = [iterator] - request = object() - yrd = _make_partial_rows_data(client._data_stub.read_rows, request) - with pytest.raises(InvalidChunk): - _partial_rows_data_consume_all(yrd) - - -def test_partial_rows_data_state_cell_in_progress(): - labels = ["L1", "L2"] - resp = _ReadRowsResponseV2( - [ - _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - value_size=(2 * len(VALUE)), - labels=labels, - ), - _ReadRowsResponseCellChunkPB(value=VALUE, commit_row=True), - ] - ) - - def fake_read(*args, **kwargs): - return iter([resp]) - - yrd = _make_partial_rows_data(fake_read, None) - yrd.consume_all() - - expected_row = _make_partial_row_data(ROW_KEY) - expected_row._cells = { - QUALIFIER: [ - _make_cell( - value=(VALUE + VALUE), timestamp_micros=TIMESTAMP_MICROS, labels=labels - ) - ] + row3 = _make_partial_row_data(ROW_KEY + b"3") + row3._cells[FAMILY_NAME] = { + QUALIFIER: [_make_cell(value=VALUE, timestamp_micros=TIMESTAMP_MICROS)] } - -def test_partial_rows_data_yield_rows_data(): - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - - client = _Client() - - chunk = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - chunks = [chunk] - - response = _ReadRowsResponseV2(chunks) - iterator = _MockCancellableIterator(response) - data_api = mock.create_autospec(BigtableClient) - client._data_stub = data_api - client._data_stub.read_rows.side_effect = [iterator] - - request = object() - - yrd = _make_partial_rows_data(client._data_stub.read_rows, request) - - result = _partial_rows_data_consume_all(yrd)[0] - - assert result == ROW_KEY - - -def test_partial_rows_data_yield_retry_rows_data(): - from google.api_core import retry - - client = _Client() - - retry_read_rows = retry.Retry(predicate=_read_rows_retry_exception) - - chunk = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - chunks = [chunk] - - response = _ReadRowsResponseV2(chunks) - failure_iterator = _MockFailureIterator_1() - iterator = _MockCancellableIterator(response) - client._data_stub = mock.MagicMock() - client._data_stub.ReadRows.side_effect = [failure_iterator, iterator] - - request = object() - - yrd = _make_partial_rows_data(client._data_stub.ReadRows, request, retry_read_rows) - - result = _partial_rows_data_consume_all(yrd)[0] - - assert result == ROW_KEY - - -def _make_read_rows_request_manager(*args, **kwargs): - from google.cloud.bigtable.row_data import _ReadRowsRequestManager - - return _ReadRowsRequestManager(*args, **kwargs) - - -@pytest.fixture(scope="session") -def rrrm_data(): - from google.cloud.bigtable import row_set - - row_range1 = row_set.RowRange(b"row_key21", b"row_key29") - row_range2 = row_set.RowRange(b"row_key31", b"row_key39") - row_range3 = row_set.RowRange(b"row_key41", b"row_key49") - - request = _ReadRowsRequestPB(table_name=TABLE_NAME) - request.rows.row_ranges.append(row_range1.get_range_kwargs()) - request.rows.row_ranges.append(row_range2.get_range_kwargs()) - request.rows.row_ranges.append(row_range3.get_range_kwargs()) - - yield { - "row_range1": row_range1, - "row_range2": row_range2, - "row_range3": row_range3, - "request": request, + assert partial_rows_data.rows == { + row1.row_key: row1, + row2.row_key: row2, + row3.row_key: row3, } -def test_RRRM_constructor(): - request = mock.Mock() - last_scanned_key = "last_key" - rows_read_so_far = 10 - - request_manager = _make_read_rows_request_manager( - request, last_scanned_key, rows_read_so_far - ) - assert request == request_manager.message - assert last_scanned_key == request_manager.last_scanned_key - assert rows_read_so_far == request_manager.rows_read_so_far - - -def test_RRRM__filter_row_key(): - table_name = "table_name" - request = _ReadRowsRequestPB(table_name=table_name) - request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key3", b"row_key4"]) - - last_scanned_key = b"row_key2" - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - row_keys = request_manager._filter_rows_keys() - - expected_row_keys = [b"row_key3", b"row_key4"] - assert expected_row_keys == row_keys - - -def test_RRRM__filter_row_key_is_empty(): - table_name = "table_name" - request = _ReadRowsRequestPB(table_name=table_name) - request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key3", b"row_key4"]) - - last_scanned_key = b"row_key4" - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 4) - row_keys = request_manager._filter_rows_keys() - - assert row_keys == [] - - -def test_RRRM__filter_row_ranges_all_ranges_added_back(rrrm_data): - from google.cloud.bigtable_v2.types import data as data_v2_pb2 - - request = rrrm_data["request"] - last_scanned_key = b"row_key14" - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - row_ranges = request_manager._filter_row_ranges() - - exp_row_range1 = data_v2_pb2.RowRange( - start_key_closed=b"row_key21", end_key_open=b"row_key29" - ) - exp_row_range2 = data_v2_pb2.RowRange( - start_key_closed=b"row_key31", end_key_open=b"row_key39" - ) - exp_row_range3 = data_v2_pb2.RowRange( - start_key_closed=b"row_key41", end_key_open=b"row_key49" - ) - exp_row_ranges = [exp_row_range1, exp_row_range2, exp_row_range3] - - assert exp_row_ranges == row_ranges - - -def test_RRRM__filter_row_ranges_all_ranges_already_read(rrrm_data): - request = rrrm_data["request"] - last_scanned_key = b"row_key54" - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - row_ranges = request_manager._filter_row_ranges() - - assert row_ranges == [] - - -def test_RRRM__filter_row_ranges_all_ranges_already_read_open_closed(): - from google.cloud.bigtable import row_set - - last_scanned_key = b"row_key54" - - row_range1 = row_set.RowRange(b"row_key21", b"row_key29", False, True) - row_range2 = row_set.RowRange(b"row_key31", b"row_key39") - row_range3 = row_set.RowRange(b"row_key41", b"row_key49", False, True) - - request = _ReadRowsRequestPB(table_name=TABLE_NAME) - request.rows.row_ranges.append(row_range1.get_range_kwargs()) - request.rows.row_ranges.append(row_range2.get_range_kwargs()) - request.rows.row_ranges.append(row_range3.get_range_kwargs()) - - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - request_manager.new_message = _ReadRowsRequestPB(table_name=TABLE_NAME) - row_ranges = request_manager._filter_row_ranges() - - assert row_ranges == [] - - -def test_RRRM__filter_row_ranges_some_ranges_already_read(rrrm_data): - from google.cloud.bigtable_v2.types import data as data_v2_pb2 - - request = rrrm_data["request"] - last_scanned_key = b"row_key22" - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - request_manager.new_message = _ReadRowsRequestPB(table_name=TABLE_NAME) - row_ranges = request_manager._filter_row_ranges() - - exp_row_range1 = data_v2_pb2.RowRange( - start_key_open=b"row_key22", end_key_open=b"row_key29" - ) - exp_row_range2 = data_v2_pb2.RowRange( - start_key_closed=b"row_key31", end_key_open=b"row_key39" - ) - exp_row_range3 = data_v2_pb2.RowRange( - start_key_closed=b"row_key41", end_key_open=b"row_key49" - ) - exp_row_ranges = [exp_row_range1, exp_row_range2, exp_row_range3] - - assert exp_row_ranges == row_ranges - - -def test_RRRM_build_updated_request(rrrm_data): - from google.cloud.bigtable.row_filters import RowSampleFilter - from google.cloud.bigtable_v2 import types - - row_range1 = rrrm_data["row_range1"] - row_filter = RowSampleFilter(0.33) - last_scanned_key = b"row_key25" - request = _ReadRowsRequestPB( - filter=row_filter._to_pb(), - rows_limit=8, - table_name=TABLE_NAME, - app_profile_id="app-profile-id-1", - ) - request.rows.row_ranges.append(row_range1.get_range_kwargs()) - - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - - result = request_manager.build_updated_request() - - expected_result = _ReadRowsRequestPB( - table_name=TABLE_NAME, - filter=row_filter._to_pb(), - rows_limit=6, - app_profile_id="app-profile-id-1", - ) - - row_range1 = types.RowRange( - start_key_open=last_scanned_key, end_key_open=row_range1.end_key - ) - expected_result.rows.row_ranges.append(row_range1) - - assert expected_result == result - - -def test_RRRM_build_updated_request_full_table(): - from google.cloud.bigtable_v2 import types - - last_scanned_key = b"row_key14" - - request = _ReadRowsRequestPB(table_name=TABLE_NAME) - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - - result = request_manager.build_updated_request() - expected_result = _ReadRowsRequestPB(table_name=TABLE_NAME) - row_range1 = types.RowRange(start_key_open=last_scanned_key) - expected_result.rows.row_ranges.append(row_range1) - assert expected_result == result - - -def test_RRRM_build_updated_request_no_start_key(): - from google.cloud.bigtable.row_filters import RowSampleFilter - from google.cloud.bigtable_v2 import types - - row_filter = RowSampleFilter(0.33) - last_scanned_key = b"row_key25" - request = _ReadRowsRequestPB( - filter=row_filter._to_pb(), rows_limit=8, table_name=TABLE_NAME - ) - row_range1 = types.RowRange(end_key_open=b"row_key29") - request.rows.row_ranges.append(row_range1) - - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - - result = request_manager.build_updated_request() - - expected_result = _ReadRowsRequestPB( - table_name=TABLE_NAME, filter=row_filter._to_pb(), rows_limit=6 - ) - - row_range2 = types.RowRange( - start_key_open=last_scanned_key, end_key_open=b"row_key29" - ) - expected_result.rows.row_ranges.append(row_range2) - - assert expected_result == result - - -def test_RRRM_build_updated_request_no_end_key(): - from google.cloud.bigtable.row_filters import RowSampleFilter - from google.cloud.bigtable_v2 import types - - row_filter = RowSampleFilter(0.33) - last_scanned_key = b"row_key25" - request = _ReadRowsRequestPB( - filter=row_filter._to_pb(), rows_limit=8, table_name=TABLE_NAME - ) - - row_range1 = types.RowRange(start_key_closed=b"row_key20") - request.rows.row_ranges.append(row_range1) - - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - - result = request_manager.build_updated_request() - - expected_result = _ReadRowsRequestPB( - table_name=TABLE_NAME, filter=row_filter._to_pb(), rows_limit=6 - ) - row_range2 = types.RowRange(start_key_open=last_scanned_key) - expected_result.rows.row_ranges.append(row_range2) - - assert expected_result == result - - -def test_RRRM_build_updated_request_rows(): - from google.cloud.bigtable.row_filters import RowSampleFilter - - row_filter = RowSampleFilter(0.33) - last_scanned_key = b"row_key4" - request = _ReadRowsRequestPB( - filter=row_filter._to_pb(), rows_limit=5, table_name=TABLE_NAME - ) - request.rows.row_keys.extend( - [b"row_key1", b"row_key2", b"row_key4", b"row_key5", b"row_key7", b"row_key9"] - ) - - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 3) - - result = request_manager.build_updated_request() - - expected_result = _ReadRowsRequestPB( - table_name=TABLE_NAME, filter=row_filter._to_pb(), rows_limit=2 - ) - expected_result.rows.row_keys.extend([b"row_key5", b"row_key7", b"row_key9"]) - - assert expected_result == result - - -def test_RRRM_build_updated_request_rows_limit(): - from google.cloud.bigtable_v2 import types - - last_scanned_key = b"row_key14" - - request = _ReadRowsRequestPB(table_name=TABLE_NAME, rows_limit=10) - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - - result = request_manager.build_updated_request() - expected_result = _ReadRowsRequestPB(table_name=TABLE_NAME, rows_limit=8) - row_range1 = types.RowRange(start_key_open=last_scanned_key) - expected_result.rows.row_ranges.append(row_range1) - assert expected_result == result - - -def test_RRRM__key_already_read(): - last_scanned_key = b"row_key14" - request = _ReadRowsRequestPB(table_name=TABLE_NAME) - request_manager = _make_read_rows_request_manager(request, last_scanned_key, 2) - - assert request_manager._key_already_read(b"row_key11") - assert not request_manager._key_already_read(b"row_key16") - - -def test_RRRM__rows_limit_reached(): - from google.cloud.bigtable.row_data import InvalidRetryRequest - - last_scanned_key = b"row_key14" - request = _ReadRowsRequestPB(table_name=TABLE_NAME) - request.rows_limit = 2 - request_manager = _make_read_rows_request_manager( - request, last_scanned_key=last_scanned_key, rows_read_so_far=2 - ) - with pytest.raises(InvalidRetryRequest): - request_manager.build_updated_request() - - -def test_RRRM_build_updated_request_last_row_read_raises_invalid_retry_request(): - from google.cloud.bigtable.row_data import InvalidRetryRequest - - last_scanned_key = b"row_key4" - request = _ReadRowsRequestPB(table_name=TABLE_NAME) - request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key4"]) - - request_manager = _make_read_rows_request_manager( - request, last_scanned_key, rows_read_so_far=3 - ) - with pytest.raises(InvalidRetryRequest): - request_manager.build_updated_request() - - -def test_RRRM_build_updated_request_row_ranges_read_raises_invalid_retry_request(): - from google.cloud.bigtable.row_data import InvalidRetryRequest - from google.cloud.bigtable import row_set - - row_range1 = row_set.RowRange(b"row_key21", b"row_key29") - - request = _ReadRowsRequestPB(table_name=TABLE_NAME) - request.rows.row_ranges.append(row_range1.get_range_kwargs()) - - last_scanned_key = b"row_key4" - request = _ReadRowsRequestPB( - table_name=TABLE_NAME, - ) - request.rows.row_ranges.append(row_range1.get_range_kwargs()) - - request_manager = _make_read_rows_request_manager( - request, last_scanned_key, rows_read_so_far=2 - ) - with pytest.raises(InvalidRetryRequest): - request_manager.build_updated_request() - - -def test_RRRM_build_updated_request_row_ranges_valid(): - from google.cloud.bigtable import row_set - - row_range1 = row_set.RowRange(b"row_key21", b"row_key29") - - request = _ReadRowsRequestPB(table_name=TABLE_NAME) - request.rows.row_ranges.append(row_range1.get_range_kwargs()) - - last_scanned_key = b"row_key21" - request = _ReadRowsRequestPB( - table_name=TABLE_NAME, - ) - request.rows.row_ranges.append(row_range1.get_range_kwargs()) - - request_manager = _make_read_rows_request_manager( - request, last_scanned_key, rows_read_so_far=1 - ) - updated_request = request_manager.build_updated_request() - assert len(updated_request.rows.row_ranges) > 0 - - -class _MockCancellableIterator(object): - cancel_calls = 0 - - def __init__(self, *values): - self.iter_values = iter(values) - self.last_scanned_row_key = "" - - def cancel(self): - self.cancel_calls += 1 - - def next(self): - return next(self.iter_values) - - __next__ = next - - -class _MockFailureIterator_1(object): - def next(self): - from google.api_core.exceptions import DeadlineExceeded - - raise DeadlineExceeded("Failed to read from server") - - __next__ = next - - -def _ReadRowsResponseV2(chunks, last_scanned_row_key=b""): - from google.cloud.bigtable_v2.types import bigtable as messages_v2_pb2 - - return messages_v2_pb2.ReadRowsResponse( - chunks=chunks, last_scanned_row_key=last_scanned_row_key - ) - - -def _generate_cell_chunks(chunk_text_pbs): - from google.protobuf.text_format import Merge - from google.cloud.bigtable_v2.types.bigtable import ReadRowsResponse - - chunks = [] - - for chunk_text_pb in chunk_text_pbs: - chunk = ReadRowsResponse.CellChunk() - chunk._pb = Merge(chunk_text_pb, chunk._pb) - chunks.append(chunk) +def test_partial_rows_data_cancel(): + partial_rows_data = _make_partial_rows_data(_make_generator(ROWS)) + row_data = [] - return chunks + count = 0 + for row in partial_rows_data: + row_data.append(row) + count += 1 + if count == 1: + partial_rows_data.cancel() + row1 = _make_partial_row_data(ROW_KEY) + row1._cells[FAMILY_NAME] = { + QUALIFIER: [_make_cell(value=VALUE, timestamp_micros=TIMESTAMP_MICROS)] + } + assert row_data == [row1] -def _ReadRowsResponseCellChunkPB(*args, **kw): - from google.cloud.bigtable_v2.types import bigtable as messages_v2_pb2 - family_name = kw.pop("family_name", None) - qualifier = kw.pop("qualifier", None) - message = messages_v2_pb2.ReadRowsResponse.CellChunk(*args, **kw) +def test_partial_rows_data_deadline_exceeded(): + from google.api_core import exceptions - if family_name: - message.family_name = family_name - if qualifier: - message.qualifier = qualifier + def generator_w_error(): + raise exceptions.DeadlineExceeded("Boom") + yield 1 - return message + partial_rows_data = _make_partial_rows_data(generator_w_error()) + with pytest.raises(exceptions.RetryError): + list(partial_rows_data) + assert partial_rows_data._cancelled def _make_cell_pb(value): from google.cloud.bigtable import row_data return row_data.Cell(value, TIMESTAMP_MICROS) - - -def _ReadRowsRequestPB(*args, **kw): - from google.cloud.bigtable_v2.types import bigtable as messages_v2_pb2 - - return messages_v2_pb2.ReadRowsRequest(*args, **kw) - - -def _read_rows_retry_exception(exc): - from google.api_core.exceptions import DeadlineExceeded - - return isinstance(exc, DeadlineExceeded) - - -class _Client(object): - data_stub = None diff --git a/tests/unit/v2_client/test_row_merger.py b/tests/unit/v2_client/test_row_merger.py index 483c04536..deac16a7f 100644 --- a/tests/unit/v2_client/test_row_merger.py +++ b/tests/unit/v2_client/test_row_merger.py @@ -1,13 +1,9 @@ import os -from itertools import zip_longest -from typing import List import proto -import pytest -from google.cloud.bigtable.row_data import PartialRowsData, PartialRowData, InvalidChunk +from google.cloud.bigtable.row_data import PartialRowData from google.cloud.bigtable_v2.types.bigtable import ReadRowsResponse -from google.cloud.bigtable.row_merger import _RowMerger # TODO: autogenerate protos from @@ -59,172 +55,3 @@ def extract_results_from_row(row: PartialRowData): ) ) return results - - -@pytest.mark.parametrize( - "test_case", parse_readrows_acceptance_tests(), ids=lambda t: t.description -) -def test_scenario(test_case: ReadRowsTest): - def fake_read(*args, **kwargs): - return iter([ReadRowsResponse(chunks=test_case.chunks)]) - - actual_results: List[ReadRowsTest.Result] = [] - try: - for row in PartialRowsData(fake_read, request=None): - actual_results.extend(extract_results_from_row(row)) - except (InvalidChunk, ValueError): - actual_results.append(ReadRowsTest.Result(error=True)) - - for expected, actual in zip_longest(test_case.results, actual_results): - assert actual == expected - - -def test_out_of_order_rows(): - row_merger = _RowMerger(last_seen_row=b"z") - with pytest.raises(InvalidChunk): - list(row_merger.process_chunks(ReadRowsResponse(last_scanned_row_key=b"a"))) - - -def test_bare_reset(): - first_chunk = ReadRowsResponse.CellChunk( - ReadRowsResponse.CellChunk( - row_key=b"a", family_name="f", qualifier=b"q", value=b"v" - ) - ) - with pytest.raises(InvalidChunk): - _process_chunks( - first_chunk, - ReadRowsResponse.CellChunk( - ReadRowsResponse.CellChunk(reset_row=True, row_key=b"a") - ), - ) - with pytest.raises(InvalidChunk): - _process_chunks( - first_chunk, - ReadRowsResponse.CellChunk( - ReadRowsResponse.CellChunk(reset_row=True, family_name="f") - ), - ) - with pytest.raises(InvalidChunk): - _process_chunks( - first_chunk, - ReadRowsResponse.CellChunk( - ReadRowsResponse.CellChunk(reset_row=True, qualifier=b"q") - ), - ) - with pytest.raises(InvalidChunk): - _process_chunks( - first_chunk, - ReadRowsResponse.CellChunk( - ReadRowsResponse.CellChunk(reset_row=True, timestamp_micros=1000) - ), - ) - with pytest.raises(InvalidChunk): - _process_chunks( - first_chunk, - ReadRowsResponse.CellChunk( - ReadRowsResponse.CellChunk(reset_row=True, labels=["a"]) - ), - ) - with pytest.raises(InvalidChunk): - _process_chunks( - first_chunk, - ReadRowsResponse.CellChunk( - ReadRowsResponse.CellChunk(reset_row=True, value=b"v") - ), - ) - - -def test_missing_family(): - with pytest.raises(InvalidChunk): - _process_chunks( - ReadRowsResponse.CellChunk( - row_key=b"a", - qualifier=b"q", - timestamp_micros=1000, - value=b"v", - commit_row=True, - ) - ) - - -def test_mid_cell_row_key_change(): - with pytest.raises(InvalidChunk): - _process_chunks( - ReadRowsResponse.CellChunk( - row_key=b"a", - family_name="f", - qualifier=b"q", - timestamp_micros=1000, - value_size=2, - value=b"v", - ), - ReadRowsResponse.CellChunk(row_key=b"b", value=b"v", commit_row=True), - ) - - -def test_mid_cell_family_change(): - with pytest.raises(InvalidChunk): - _process_chunks( - ReadRowsResponse.CellChunk( - row_key=b"a", - family_name="f", - qualifier=b"q", - timestamp_micros=1000, - value_size=2, - value=b"v", - ), - ReadRowsResponse.CellChunk(family_name="f2", value=b"v", commit_row=True), - ) - - -def test_mid_cell_qualifier_change(): - with pytest.raises(InvalidChunk): - _process_chunks( - ReadRowsResponse.CellChunk( - row_key=b"a", - family_name="f", - qualifier=b"q", - timestamp_micros=1000, - value_size=2, - value=b"v", - ), - ReadRowsResponse.CellChunk(qualifier=b"q2", value=b"v", commit_row=True), - ) - - -def test_mid_cell_timestamp_change(): - with pytest.raises(InvalidChunk): - _process_chunks( - ReadRowsResponse.CellChunk( - row_key=b"a", - family_name="f", - qualifier=b"q", - timestamp_micros=1000, - value_size=2, - value=b"v", - ), - ReadRowsResponse.CellChunk( - timestamp_micros=2000, value=b"v", commit_row=True - ), - ) - - -def test_mid_cell_labels_change(): - with pytest.raises(InvalidChunk): - _process_chunks( - ReadRowsResponse.CellChunk( - row_key=b"a", - family_name="f", - qualifier=b"q", - timestamp_micros=1000, - value_size=2, - value=b"v", - ), - ReadRowsResponse.CellChunk(labels=["b"], value=b"v", commit_row=True), - ) - - -def _process_chunks(*chunks): - req = ReadRowsResponse.pb(ReadRowsResponse(chunks=chunks)) - return list(_RowMerger().process_chunks(req)) diff --git a/tests/unit/v2_client/test_row_set.py b/tests/unit/v2_client/test_row_set.py index 1964922bd..5eee880ff 100644 --- a/tests/unit/v2_client/test_row_set.py +++ b/tests/unit/v2_client/test_row_set.py @@ -168,28 +168,6 @@ def test_row_set_add_row_range_with_prefix(): assert row_set.row_ranges[0].end_key == b"rox" -def test_row_set__update_message_request(): - from google.cloud._helpers import _to_bytes - from google.cloud.bigtable.row_set import RowRange - from google.cloud.bigtable.row_set import RowSet - - row_set = RowSet() - table_name = "table_name" - row_set.add_row_key("row_key1") - row_range1 = RowRange(b"row_key21", b"row_key29") - row_set.add_row_range(row_range1) - - request = _ReadRowsRequestPB(table_name=table_name) - row_set._update_message_request(request) - - expected_request = _ReadRowsRequestPB(table_name=table_name) - expected_request.rows.row_keys.append(_to_bytes("row_key1")) - - expected_request.rows.row_ranges.append(row_range1.get_range_kwargs()) - - assert request == expected_request - - def test_row_range_constructor(): from google.cloud.bigtable.row_set import RowRange diff --git a/tests/unit/v2_client/test_table.py b/tests/unit/v2_client/test_table.py index 1ddeba28d..568f336c2 100644 --- a/tests/unit/v2_client/test_table.py +++ b/tests/unit/v2_client/test_table.py @@ -20,6 +20,7 @@ from grpc import StatusCode from google.api_core.exceptions import DeadlineExceeded +from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery from ._testing import _make_credentials PROJECT_ID = "project-id" @@ -583,135 +584,6 @@ def _make_gapic_api(client): return gapic_client_mock -def _table_read_row_helper(chunks, expected_result, app_profile_id=None): - from google.cloud._testing import _Monkey - from google.cloud.bigtable import table as MUT - from google.cloud.bigtable.row_set import RowSet - from google.cloud.bigtable.row_filters import RowSampleFilter - from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS - - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - instance = client.instance(instance_id=INSTANCE_ID) - table = _make_table(TABLE_ID, instance, app_profile_id=app_profile_id) - - # Create request_pb - request_pb = object() # Returned by our mock. - mock_created = [] - - def mock_create_row_request(table_name, **kwargs): - mock_created.append((table_name, kwargs)) - return request_pb - - # Create response_iterator - if chunks is None: - response_iterator = iter(()) # no responses at all - else: - response_pb = _ReadRowsResponsePB(chunks=chunks) - response_iterator = iter([response_pb]) - - gapic_api = _make_gapic_api(client) - gapic_api.read_rows.return_value = response_iterator - - filter_obj = RowSampleFilter(0.33) - - with _Monkey(MUT, _create_row_request=mock_create_row_request): - result = table.read_row(ROW_KEY, filter_=filter_obj) - - row_set = RowSet() - row_set.add_row_key(ROW_KEY) - expected_request = [ - ( - table.name, - { - "end_inclusive": False, - "row_set": row_set, - "app_profile_id": app_profile_id, - "end_key": None, - "limit": None, - "start_key": None, - "filter_": filter_obj, - }, - ) - ] - assert result == expected_result - assert mock_created == expected_request - - gapic_api.read_rows.assert_called_once_with( - request_pb, timeout=61.0, retry=DEFAULT_RETRY_READ_ROWS - ) - - -def test_table_read_row_miss_no__responses(): - _table_read_row_helper(None, None) - - -def test_table_read_row_miss_no_chunks_in_response(): - chunks = [] - _table_read_row_helper(chunks, None) - - -def test_table_read_row_complete(): - from google.cloud.bigtable.row_data import Cell - from google.cloud.bigtable.row_data import PartialRowData - - app_profile_id = "app-profile-id" - chunk = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - chunks = [chunk] - expected_result = PartialRowData(row_key=ROW_KEY) - family = expected_result._cells.setdefault(FAMILY_NAME, {}) - column = family.setdefault(QUALIFIER, []) - column.append(Cell.from_pb(chunk)) - - _table_read_row_helper(chunks, expected_result, app_profile_id) - - -def test_table_read_row_more_than_one_row_returned(): - app_profile_id = "app-profile-id" - chunk_1 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - )._pb - chunk_2 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY_2, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - )._pb - - chunks = [chunk_1, chunk_2] - - with pytest.raises(ValueError): - _table_read_row_helper(chunks, None, app_profile_id) - - -def test_table_read_row_still_partial(): - chunk = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - ) - chunks = [chunk] # No "commit row". - - with pytest.raises(ValueError): - _table_read_row_helper(chunks, None) - - def _table_mutate_rows_helper( mutation_timeout=None, app_profile_id=None, @@ -905,38 +777,61 @@ def test_table_mutate_rows_w_mutation_timeout_and_timeout_arg(): def test_table_read_rows(): - from google.cloud._testing import _Monkey + from google.cloud.bigtable.data._helpers import TABLE_DEFAULT + from google.cloud.bigtable.data.row import Row, Cell + from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery + from google.cloud.bigtable.row import PartialRowData + from google.cloud.bigtable.row import Cell as PartialRowDataCell from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import table as MUT from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS + from google.cloud.bigtable.row_set import RowRange credentials = _make_credentials() client = _make_client(project="project-id", credentials=credentials, admin=True) - gapic_api = _make_gapic_api(client) instance = client.instance(instance_id=INSTANCE_ID) app_profile_id = "app-profile-id" table = _make_table(TABLE_ID, instance, app_profile_id=app_profile_id) - # Create request_pb - request_pb = object() # Returned by our mock. - retry = DEFAULT_RETRY_READ_ROWS - mock_created = [] + # Create read_rows return value + rows = [ + Row( + key=ROW_KEY, + cells=[ + Cell( + value=VALUE, + row_key=ROW_KEY, + family=FAMILY_NAME, + qualifier=QUALIFIER, + timestamp_micros=TIMESTAMP_MICROS, + ) + ], + ), + Row( + key=ROW_KEY_1, + cells=[ + Cell( + value=VALUE, + row_key=ROW_KEY_1, + family=FAMILY_NAME, + qualifier=QUALIFIER, + timestamp_micros=TIMESTAMP_MICROS, + ) + ], + ), + ] + generator = (r for r in rows) - def mock_create_row_request(table_name, **kwargs): - mock_created.append((table_name, kwargs)) - return request_pb - - # Create expected_result. - expected_result = PartialRowsData( - client._table_data_client._gapic_client.transport.read_rows, request_pb, retry - ) + # Create expected result. + expected_result = PartialRowsData(generator) # Perform the method and check the result. - start_key = b"start-key" + start_key = b"begin-key" end_key = b"end-key" filter_obj = object() limit = 22 - with _Monkey(MUT, _create_row_request=mock_create_row_request): + retry = DEFAULT_RETRY_READ_ROWS + with mock.patch.object(table._table_impl, "read_rows_stream") as read_rows_mock: + read_rows_mock.return_value = generator result = table.read_rows( start_key=start_key, end_key=end_key, @@ -946,265 +841,42 @@ def mock_create_row_request(table_name, **kwargs): ) assert result.rows == expected_result.rows - assert result.retry == expected_result.retry - created_kwargs = { - "start_key": start_key, - "end_key": end_key, - "filter_": filter_obj, - "limit": limit, - "end_inclusive": False, - "app_profile_id": app_profile_id, - "row_set": None, - } - assert mock_created == [(table.name, created_kwargs)] - - gapic_api.read_rows.assert_called_once_with(request_pb, timeout=61.0, retry=retry) - - -def test_table_read_retry_rows(): - from google.api_core import retry + assert result._generator == expected_result._generator - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - gapic_api = _make_gapic_api(client) - instance = client.instance(instance_id=INSTANCE_ID) - table = _make_table(TABLE_ID, instance) - - retry_read_rows = retry.Retry(predicate=_read_rows_retry_exception) - - # Create response_iterator - chunk_1 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY_1, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - - chunk_2 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY_2, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, + expected_read_rows_query = ReadRowsQuery( + row_ranges=RowRange(start_key=start_key, end_key=end_key), + row_filter=filter_obj, + limit=limit, ) - response_1 = _ReadRowsResponseV2([chunk_1]) - response_2 = _ReadRowsResponseV2([chunk_2]) - response_failure_iterator_1 = _MockFailureIterator_1() - response_failure_iterator_2 = _MockFailureIterator_2([response_1]) - response_iterator = _MockReadRowsIterator(response_2) - - gapic_api.table_path.return_value = ( - f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}" + read_rows_mock.assert_called_once_with( + expected_read_rows_query, + operation_timeout=TABLE_DEFAULT.READ_ROWS, + attempt_timeout=retry.deadline, + retryable_errors=TABLE_DEFAULT.READ_ROWS, ) - gapic_api.read_rows.side_effect = [ - response_failure_iterator_1, - response_failure_iterator_2, - response_iterator, - ] - - rows = [ - row - for row in table.read_rows( - start_key=ROW_KEY_1, end_key=ROW_KEY_2, retry=retry_read_rows - ) - ] - - result = rows[1] - assert result.row_key == ROW_KEY_2 - - assert len(gapic_api.read_rows.mock_calls) == 3 - - -def test_table_read_retry_rows_no_full_table_scan(): - from google.api_core import retry - - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - gapic_api = _make_gapic_api(client) - instance = client.instance(instance_id=INSTANCE_ID) - table = _make_table(TABLE_ID, instance) - - retry_read_rows = retry.Retry(predicate=_read_rows_retry_exception) - - # Create response_iterator - chunk_1 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY_2, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - - response_1 = _ReadRowsResponseV2([chunk_1]) - response_failure_iterator_2 = _MockFailureIterator_2([response_1]) - - gapic_api.table_path.return_value = ( - f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}" - ) - - gapic_api.read_rows.side_effect = [ - response_failure_iterator_2, - ] - - rows = [ - row - for row in table.read_rows( - start_key="doesn't matter", end_key=ROW_KEY_2, retry=retry_read_rows - ) - ] - assert len(rows) == 1 - result = rows[0] - assert result.row_key == ROW_KEY_2 - - assert len(gapic_api.read_rows.mock_calls) == 1 - assert ( - len(gapic_api.read_rows.mock_calls[0].args[0].rows.row_ranges) > 0 - ) # not empty row_ranges - - -def test_table_yield_retry_rows(): - from google.cloud.bigtable.table import _create_row_request - - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - instance = client.instance(instance_id=INSTANCE_ID) - table = _make_table(TABLE_ID, instance) - - # Create response_iterator - chunk_1 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY_1, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - - chunk_2 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY_2, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - - response_1 = _ReadRowsResponseV2([chunk_1]) - response_2 = _ReadRowsResponseV2([chunk_2]) - response_failure_iterator_1 = _MockFailureIterator_1() - response_failure_iterator_2 = _MockFailureIterator_2([response_1]) - response_iterator = _MockReadRowsIterator(response_2) - - gapic_api = _make_gapic_api(client) - gapic_api.table_path.return_value = ( - f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}" - ) - gapic_api.read_rows.side_effect = [ - response_failure_iterator_1, - response_failure_iterator_2, - response_iterator, - ] - - rows = [] - with warnings.catch_warnings(record=True) as warned: - for row in table.yield_rows(start_key=ROW_KEY_1, end_key=ROW_KEY_2): - rows.append(row) - - assert len(warned) >= 1 - assert DeprecationWarning in [w.category for w in warned] - - result = rows[1] - assert result.row_key == ROW_KEY_2 - - expected_request = _create_row_request( - table.name, - start_key=ROW_KEY_1, - end_key=ROW_KEY_2, - ) - gapic_api.read_rows.mock_calls = [expected_request] * 3 - - -def test_table_yield_rows_with_row_set(): - from google.cloud.bigtable.row_set import RowSet - from google.cloud.bigtable.row_set import RowRange - from google.cloud.bigtable.table import _create_row_request - from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS - - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - instance = client.instance(instance_id=INSTANCE_ID) - table = _make_table(TABLE_ID, instance) - - # Create response_iterator - chunk_1 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY_1, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - - chunk_2 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY_2, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - - chunk_3 = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY_3, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - commit_row=True, - ) - - response_1 = _ReadRowsResponseV2([chunk_1]) - response_2 = _ReadRowsResponseV2([chunk_2]) - response_3 = _ReadRowsResponseV2([chunk_3]) - response_iterator = _MockReadRowsIterator(response_1, response_2, response_3) - - gapic_api = _make_gapic_api(client) - gapic_api.table_path.return_value = ( - f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}" - ) - gapic_api.read_rows.side_effect = [response_iterator] - - rows = [] - row_set = RowSet() - row_set.add_row_range(RowRange(start_key=ROW_KEY_1, end_key=ROW_KEY_2)) - row_set.add_row_key(ROW_KEY_3) - - with warnings.catch_warnings(record=True) as warned: - for row in table.yield_rows(row_set=row_set): - rows.append(row) - - assert len(warned) >= 1 - assert DeprecationWarning in [w.category for w in warned] - - assert rows[0].row_key == ROW_KEY_1 - assert rows[1].row_key == ROW_KEY_2 - assert rows[2].row_key == ROW_KEY_3 + # Test that the correct rows get returned. + partial_row_data = PartialRowData(ROW_KEY) + partial_row_data._cells = { + FAMILY_NAME: { + QUALIFIER: [ + PartialRowDataCell(value=VALUE, timestamp_micros=TIMESTAMP_MICROS), + ], + }, + } + partial_row_data_1 = PartialRowData(ROW_KEY_1) + partial_row_data_1._cells = { + FAMILY_NAME: { + QUALIFIER: [ + PartialRowDataCell(value=VALUE, timestamp_micros=TIMESTAMP_MICROS), + ], + }, + } + expected_row_data = [partial_row_data, partial_row_data_1] - expected_request = _create_row_request( - table.name, - start_key=ROW_KEY_1, - end_key=ROW_KEY_2, - ) - expected_request.rows.row_keys.append(ROW_KEY_3) - gapic_api.read_rows.assert_called_once_with( - expected_request, timeout=61.0, retry=DEFAULT_RETRY_READ_ROWS - ) + row_data = list(result) + assert row_data == expected_row_data def test_table_sample_row_keys(): @@ -1559,15 +1231,6 @@ def test_table_restore_table_w_backup_name(): _table_restore_helper(backup_name=BACKUP_NAME) -def test__create_row_request_table_name_only(): - from google.cloud.bigtable.table import _create_row_request - - table_name = "table_name" - result = _create_row_request(table_name) - expected_result = _ReadRowsRequestPB(table_name=table_name) - assert result == expected_result - - def test__create_row_request_row_range_row_set_conflict(): from google.cloud.bigtable.table import _create_row_request @@ -1579,12 +1242,9 @@ def test__create_row_request_row_range_start_key(): from google.cloud.bigtable.table import _create_row_request from google.cloud.bigtable_v2.types import RowRange - table_name = "table_name" start_key = b"begin_key" - result = _create_row_request(table_name, start_key=start_key) - expected_result = _ReadRowsRequestPB(table_name=table_name) - row_range = RowRange(start_key_closed=start_key) - expected_result.rows.row_ranges.append(row_range) + result = _create_row_request(start_key=start_key) + expected_result = ReadRowsQuery(row_ranges=RowRange(start_key_closed=start_key)) assert result == expected_result @@ -1592,12 +1252,9 @@ def test__create_row_request_row_range_end_key(): from google.cloud.bigtable.table import _create_row_request from google.cloud.bigtable_v2.types import RowRange - table_name = "table_name" - end_key = b"end_key" - result = _create_row_request(table_name, end_key=end_key) - expected_result = _ReadRowsRequestPB(table_name=table_name) - row_range = RowRange(end_key_open=end_key) - expected_result.rows.row_ranges.append(row_range) + end_key = b"begin_key" + result = _create_row_request(end_key=end_key) + expected_result = ReadRowsQuery(row_ranges=RowRange(end_key_open=end_key)) assert result == expected_result @@ -1605,13 +1262,12 @@ def test__create_row_request_row_range_both_keys(): from google.cloud.bigtable.table import _create_row_request from google.cloud.bigtable_v2.types import RowRange - table_name = "table_name" start_key = b"begin_key" end_key = b"end_key" - result = _create_row_request(table_name, start_key=start_key, end_key=end_key) - row_range = RowRange(start_key_closed=start_key, end_key_open=end_key) - expected_result = _ReadRowsRequestPB(table_name=table_name) - expected_result.rows.row_ranges.append(row_range) + result = _create_row_request(start_key=start_key, end_key=end_key) + expected_result = ReadRowsQuery( + row_ranges=RowRange(start_key_closed=start_key, end_key_open=end_key) + ) assert result == expected_result @@ -1619,15 +1275,14 @@ def test__create_row_request_row_range_both_keys_inclusive(): from google.cloud.bigtable.table import _create_row_request from google.cloud.bigtable_v2.types import RowRange - table_name = "table_name" start_key = b"begin_key" end_key = b"end_key" result = _create_row_request( - table_name, start_key=start_key, end_key=end_key, end_inclusive=True + start_key=start_key, end_key=end_key, end_inclusive=True + ) + expected_result = ReadRowsQuery( + row_ranges=RowRange(start_key_closed=start_key, end_key_closed=end_key) ) - expected_result = _ReadRowsRequestPB(table_name=table_name) - row_range = RowRange(start_key_closed=start_key, end_key_closed=end_key) - expected_result.rows.row_ranges.append(row_range) assert result == expected_result @@ -1635,22 +1290,18 @@ def test__create_row_request_with_filter(): from google.cloud.bigtable.table import _create_row_request from google.cloud.bigtable.row_filters import RowSampleFilter - table_name = "table_name" row_filter = RowSampleFilter(0.33) - result = _create_row_request(table_name, filter_=row_filter) - expected_result = _ReadRowsRequestPB( - table_name=table_name, filter=row_filter._to_pb() - ) + result = _create_row_request(filter_=row_filter) + expected_result = ReadRowsQuery(row_filter=row_filter) assert result == expected_result def test__create_row_request_with_limit(): from google.cloud.bigtable.table import _create_row_request - table_name = "table_name" limit = 1337 - result = _create_row_request(table_name, limit=limit) - expected_result = _ReadRowsRequestPB(table_name=table_name, rows_limit=limit) + result = _create_row_request(limit=limit) + expected_result = ReadRowsQuery(limit=limit) assert result == expected_result @@ -1658,32 +1309,12 @@ def test__create_row_request_with_row_set(): from google.cloud.bigtable.table import _create_row_request from google.cloud.bigtable.row_set import RowSet - table_name = "table_name" row_set = RowSet() - result = _create_row_request(table_name, row_set=row_set) - expected_result = _ReadRowsRequestPB(table_name=table_name) - assert result == expected_result - - -def test__create_row_request_with_app_profile_id(): - from google.cloud.bigtable.table import _create_row_request - - table_name = "table_name" - limit = 1337 - app_profile_id = "app-profile-id" - result = _create_row_request(table_name, limit=limit, app_profile_id=app_profile_id) - expected_result = _ReadRowsRequestPB( - table_name=table_name, rows_limit=limit, app_profile_id=app_profile_id - ) + result = _create_row_request(row_set=row_set) + expected_result = ReadRowsQuery() assert result == expected_result -def _ReadRowsRequestPB(*args, **kw): - from google.cloud.bigtable_v2.types import bigtable as messages_v2_pb2 - - return messages_v2_pb2.ReadRowsRequest(*args, **kw) - - def test_cluster_state___eq__(): from google.cloud.bigtable.enums import Table as enum_table from google.cloud.bigtable.table import ClusterState From 7ee7537ccc808ac84cc0f602187a28b5ccbb9556 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Thu, 26 Feb 2026 20:43:14 +0000 Subject: [PATCH 2/6] Closed generator on cancel in PartialRowsData --- google/cloud/bigtable/row_data.py | 7 ++--- tests/system/v2_client/test_data_api.py | 1 + tests/unit/v2_client/test_row_data.py | 35 ++++++++++++++++++------- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index 7bdd17304..e5513139a 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -84,7 +84,7 @@ def __init__(self, generator): def cancel(self): """Cancels the iterator, closing the stream.""" - self._cancelled = True + self._generator.close() def consume_all(self, max_loops=None): """Consume the streamed responses until there are no more. @@ -107,10 +107,7 @@ def __iter__(self): """ try: for row in self._generator: - if self._cancelled: - return - else: - yield PartialRowData._from_data_client_row(row) + yield PartialRowData._from_data_client_row(row) # Any exception from the generator should cancel the iterator. A # timeout, defined by catching a DeadlineExceeded, should be reraised diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index 17a606717..68fe53fdb 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -920,6 +920,7 @@ def test_table_read_rows_retry_timeout_mid_stream( rows_data = data_table_read_rows_retry_tests.read_rows( retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) ) + pytest.fail(str(type(rows_data._generator))) with pytest.raises(exceptions.RetryError): rows_data.consume_all() diff --git a/tests/unit/v2_client/test_row_data.py b/tests/unit/v2_client/test_row_data.py index 11e4b0d65..4687629d4 100644 --- a/tests/unit/v2_client/test_row_data.py +++ b/tests/unit/v2_client/test_row_data.py @@ -466,12 +466,22 @@ def _partial_rows_data_consume_all(yrd): return [row.row_key for row in yrd] -def _make_generator(rows): - return (row for row in rows) +def _make_generator(rows, error=None): + for row in rows: + if error: + raise error + else: + yield row + + +def _assert_generator_closed(generator): + with pytest.raises(StopIteration): + next(generator) def test_partial_rows_data_consume_all(): - partial_rows_data = _make_partial_rows_data(_make_generator(ROWS)) + generator = _make_generator(ROWS) + partial_rows_data = _make_partial_rows_data(generator) partial_rows_data.consume_all() row1 = _make_partial_row_data(ROW_KEY) @@ -493,9 +503,12 @@ def test_partial_rows_data_consume_all(): row3.row_key: row3, } + _assert_generator_closed(generator) + def test_partial_rows_data_cancel(): - partial_rows_data = _make_partial_rows_data(_make_generator(ROWS)) + generator = _make_generator(ROWS) + partial_rows_data = _make_partial_rows_data(generator) row_data = [] count = 0 @@ -511,18 +524,22 @@ def test_partial_rows_data_cancel(): } assert row_data == [row1] + # We should have closed the generator, so there should be no more + # elements left in there even though we haven't iterated through all the rows. + _assert_generator_closed(generator) + def test_partial_rows_data_deadline_exceeded(): from google.api_core import exceptions - def generator_w_error(): - raise exceptions.DeadlineExceeded("Boom") - yield 1 + generator = _make_generator(ROWS, error=exceptions.DeadlineExceeded("Operation timed out.")) - partial_rows_data = _make_partial_rows_data(generator_w_error()) + partial_rows_data = _make_partial_rows_data(generator) with pytest.raises(exceptions.RetryError): list(partial_rows_data) - assert partial_rows_data._cancelled + + # An exception should close the generator. + _assert_generator_closed(generator) def _make_cell_pb(value): From 05c1be6a6ae56565b1007819710e7ddd32c948d1 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Thu, 26 Feb 2026 20:45:02 +0000 Subject: [PATCH 3/6] linting --- tests/unit/v2_client/test_row_data.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/unit/v2_client/test_row_data.py b/tests/unit/v2_client/test_row_data.py index 4687629d4..89ce94a67 100644 --- a/tests/unit/v2_client/test_row_data.py +++ b/tests/unit/v2_client/test_row_data.py @@ -532,7 +532,9 @@ def test_partial_rows_data_cancel(): def test_partial_rows_data_deadline_exceeded(): from google.api_core import exceptions - generator = _make_generator(ROWS, error=exceptions.DeadlineExceeded("Operation timed out.")) + generator = _make_generator( + ROWS, error=exceptions.DeadlineExceeded("Operation timed out.") + ) partial_rows_data = _make_partial_rows_data(generator) with pytest.raises(exceptions.RetryError): From 5bec1077864c8a7c04ef0fceff953827b0c63b8b Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Fri, 27 Feb 2026 17:10:49 +0000 Subject: [PATCH 4/6] fixed system test --- tests/system/v2_client/test_data_api.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index 68fe53fdb..17a606717 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -920,7 +920,6 @@ def test_table_read_rows_retry_timeout_mid_stream( rows_data = data_table_read_rows_retry_tests.read_rows( retry=DEFAULT_RETRY_READ_ROWS.with_deadline(10.0) ) - pytest.fail(str(type(rows_data._generator))) with pytest.raises(exceptions.RetryError): rows_data.consume_all() From 7d658f2637aa0b2cb9baecd33709cb8990623226 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Tue, 3 Mar 2026 15:27:01 +0000 Subject: [PATCH 5/6] feedback --- google/cloud/bigtable/row_data.py | 10 +++------- tests/system/v2_client/test_data_api.py | 22 +++++++++++++++++++++- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index e5513139a..19e8b213a 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -72,16 +72,14 @@ class PartialRowsData(object): :param read_method: ``ReadRows`` method. :type generator: :class:`Iterable[Row]` - :param generator: The `Row` iterator from :meth:`Table.read_rows` + :param generator: The `Row` iterator from :meth:`Table.read_rows`. This is not intended + to be created directly. """ def __init__(self, generator): self._generator = generator self.rows = {} - # Flag to stop iteration, for any reason not related to self.retry() - self._cancelled = False - def cancel(self): """Cancels the iterator, closing the stream.""" self._generator.close() @@ -94,9 +92,7 @@ def consume_all(self, max_loops=None): class as a generator instead. :type max_loops: int - :param max_loops: (Optional) Maximum number of times to try to consume - an additional ``ReadRowsResponse``. You can use this - to avoid long wait times. + :param max_loops: (Deprecated). This parameter does nothing. """ for row in self: self.rows[row.row_key] = row diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index 17a606717..d58552a5a 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -659,7 +659,6 @@ def test_table_read_rows(data_table, rows_to_delete): expected_rows = {ROW_KEY: row_data, ROW_KEY_ALT: row_alt_data} assert rows_data.rows == expected_rows - def test_read_with_label_applied(data_table, rows_to_delete, skip_on_emulator): from google.cloud.bigtable.row_filters import ApplyLabelFilter from google.cloud.bigtable.row_filters import ColumnQualifierRegexFilter @@ -722,6 +721,27 @@ def _assert_data_table_read_rows_retry_correct(rows_data): ) +def test_table_read_rows_multiple_reads( + data_table_read_rows_retry_tests, +): + from types import SimpleNamespace + + rows_data = data_table_read_rows_retry_tests.read_rows() + first_iteration = SimpleNamespace() + first_iteration.rows = {} + + second_iteration = SimpleNamespace() + second_iteration.rows = {} + for item in rows_data: + first_iteration.rows[item.row_key] = item + + for item in rows_data: + second_iteration.rows[item.row_key] = item + + _assert_data_table_read_rows_retry_correct(first_iteration) + assert second_iteration.rows == {} + + def test_table_read_rows_retry_unretriable_error_establishing_stream( data_table_read_rows_retry_tests, ): From 119ea870c9c8db7f55b0a3cd4f17e5c06e25fb60 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Tue, 3 Mar 2026 22:06:48 +0000 Subject: [PATCH 6/6] linting --- tests/system/v2_client/test_data_api.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index d58552a5a..6b46ba498 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -659,6 +659,7 @@ def test_table_read_rows(data_table, rows_to_delete): expected_rows = {ROW_KEY: row_data, ROW_KEY_ALT: row_alt_data} assert rows_data.rows == expected_rows + def test_read_with_label_applied(data_table, rows_to_delete, skip_on_emulator): from google.cloud.bigtable.row_filters import ApplyLabelFilter from google.cloud.bigtable.row_filters import ColumnQualifierRegexFilter @@ -734,10 +735,10 @@ def test_table_read_rows_multiple_reads( second_iteration.rows = {} for item in rows_data: first_iteration.rows[item.row_key] = item - + for item in rows_data: second_iteration.rows[item.row_key] = item - + _assert_data_table_read_rows_retry_correct(first_iteration) assert second_iteration.rows == {}