-
Notifications
You must be signed in to change notification settings - Fork 63
feat: Rerouted ReadRows to data client #1299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v3_staging
Are you sure you want to change the base?
Changes from all commits
1ea5c89
7ee7537
05c1be6
5bec107
7d658f2
119ea87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,17 +15,9 @@ | |
| """Container for Google Cloud Bigtable Cells and Streaming Row Contents.""" | ||
|
|
||
|
|
||
| import copy | ||
|
|
||
| import grpc # type: ignore | ||
| import warnings | ||
| from google.api_core import exceptions | ||
| from google.api_core import retry | ||
| from google.cloud._helpers import _to_bytes # type: ignore | ||
|
|
||
| from google.cloud.bigtable.row_merger import _RowMerger, _State | ||
| from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2 | ||
| from google.cloud.bigtable_v2.types import data as data_v2_pb2 | ||
| from google.cloud.bigtable.row import Cell, InvalidChunk, PartialRowData | ||
|
|
||
|
|
||
|
|
@@ -60,36 +52,7 @@ class InvalidRetryRequest(RuntimeError): | |
| """Exception raised when retry request is invalid.""" | ||
|
|
||
|
|
||
| RETRYABLE_INTERNAL_ERROR_MESSAGES = ( | ||
| "rst_stream", | ||
| "rst stream", | ||
| "received unexpected eos on data frame from server", | ||
| ) | ||
| """Internal error messages that can be retried during read row and mutation.""" | ||
|
|
||
|
|
||
| def _retriable_internal_server_error(exc): | ||
| """ | ||
| Return True if the internal server error is retriable. | ||
| """ | ||
| return isinstance(exc, exceptions.InternalServerError) and any( | ||
| retryable_message in exc.message.lower() | ||
| for retryable_message in RETRYABLE_INTERNAL_ERROR_MESSAGES | ||
| ) | ||
|
|
||
|
|
||
| def _retry_read_rows_exception(exc): | ||
| """Return True if the exception is retriable for read row requests.""" | ||
| if isinstance(exc, grpc.RpcError): | ||
| exc = exceptions.from_grpc_error(exc) | ||
|
|
||
| return _retriable_internal_server_error(exc) or isinstance( | ||
| exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded) | ||
| ) | ||
|
|
||
|
|
||
| DEFAULT_RETRY_READ_ROWS = retry.Retry( | ||
| predicate=_retry_read_rows_exception, | ||
| initial=1.0, | ||
| maximum=15.0, | ||
| multiplier=2.0, | ||
|
|
@@ -108,94 +71,18 @@ class PartialRowsData(object): | |
| :type read_method: :class:`client._table_data_client.read_rows` | ||
| :param read_method: ``ReadRows`` method. | ||
|
|
||
| :type request: :class:`data_messages_v2_pb2.ReadRowsRequest` | ||
| :param request: The ``ReadRowsRequest`` message used to create a | ||
| ReadRowsResponse iterator. If the iterator fails, a new | ||
| iterator is created, allowing the scan to continue from | ||
| the point just beyond the last successfully read row, | ||
| identified by self.last_scanned_row_key. The retry happens | ||
| inside of the Retry class, using a predicate for the | ||
| expected exceptions during iteration. | ||
|
|
||
| :type retry: :class:`~google.api_core.retry.Retry` | ||
| :param retry: (Optional) Retry delay and deadline arguments. To override, | ||
| the default value :attr:`DEFAULT_RETRY_READ_ROWS` can be | ||
| used and modified with the | ||
| :meth:`~google.api_core.retry.Retry.with_delay` method | ||
| or the | ||
| :meth:`~google.api_core.retry.Retry.with_deadline` method. | ||
| :type generator: :class:`Iterable[Row]` | ||
| :param generator: The `Row` iterator from :meth:`Table.read_rows`. This is not intended | ||
| to be created directly. | ||
| """ | ||
|
|
||
| NEW_ROW = "New row" # No cells yet complete for row | ||
| ROW_IN_PROGRESS = "Row in progress" # Some cells complete for row | ||
| CELL_IN_PROGRESS = "Cell in progress" # Incomplete cell for row | ||
|
|
||
| STATE_NEW_ROW = 1 | ||
| STATE_ROW_IN_PROGRESS = 2 | ||
| STATE_CELL_IN_PROGRESS = 3 | ||
|
|
||
| read_states = { | ||
| STATE_NEW_ROW: NEW_ROW, | ||
| STATE_ROW_IN_PROGRESS: ROW_IN_PROGRESS, | ||
| STATE_CELL_IN_PROGRESS: CELL_IN_PROGRESS, | ||
| } | ||
|
|
||
| def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS): | ||
| # Counter for rows returned to the user | ||
| self._counter = 0 | ||
| self._row_merger = _RowMerger() | ||
|
|
||
| # May be cached from previous response | ||
| self.last_scanned_row_key = None | ||
| self.read_method = read_method | ||
| self.request = request | ||
| self.retry = retry | ||
|
|
||
| # The `timeout` parameter must be somewhat greater than the value | ||
| # contained in `self.retry`, in order to avoid race-like condition and | ||
| # allow registering the first deadline error before invoking the retry. | ||
| # Otherwise there is a risk of entering an infinite loop that resets | ||
| # the timeout counter just before it being triggered. The increment | ||
| # by 1 second here is customary but should not be much less than that. | ||
| self.response_iterator = read_method( | ||
| request, timeout=self.retry._deadline + 1, retry=self.retry | ||
| ) | ||
|
|
||
| def __init__(self, generator): | ||
| self._generator = generator | ||
| self.rows = {} | ||
|
|
||
| # Flag to stop iteration, for any reason not related to self.retry() | ||
| self._cancelled = False | ||
|
|
||
| @property | ||
| def state(self): # pragma: NO COVER | ||
| """ | ||
| DEPRECATED: this property is deprecated and will be removed in the | ||
| future. | ||
| """ | ||
| warnings.warn( | ||
| "`PartialRowsData#state()` is deprecated and will be removed in the future", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
|
|
||
| # Best effort: try to map internal RowMerger states to old strings for | ||
| # backwards compatibility | ||
| internal_state = self._row_merger.state | ||
| if internal_state == _State.ROW_START: | ||
| return self.NEW_ROW | ||
| # note: _State.CELL_START, _State.CELL_COMPLETE are transient states | ||
| # and will not be visible in between chunks | ||
| elif internal_state == _State.CELL_IN_PROGRESS: | ||
| return self.CELL_IN_PROGRESS | ||
| elif internal_state == _State.ROW_COMPLETE: | ||
| return self.NEW_ROW | ||
| else: | ||
| raise RuntimeError("unexpected internal state: " + self._) | ||
|
|
||
| def cancel(self): | ||
| """Cancels the iterator, closing the stream.""" | ||
| self._cancelled = True | ||
| self.response_iterator.cancel() | ||
| self._generator.close() | ||
|
|
||
| def consume_all(self, max_loops=None): | ||
| """Consume the streamed responses until there are no more. | ||
|
|
@@ -205,176 +92,25 @@ def consume_all(self, max_loops=None): | |
| class as a generator instead. | ||
|
|
||
| :type max_loops: int | ||
| :param max_loops: (Optional) Maximum number of times to try to consume | ||
| an additional ``ReadRowsResponse``. You can use this | ||
| to avoid long wait times. | ||
| :param max_loops: (Deprecated). This parameter does nothing. | ||
| """ | ||
| for row in self: | ||
| self.rows[row.row_key] = row | ||
|
|
||
| def _create_retry_request(self): | ||
| """Helper for :meth:`__iter__`.""" | ||
| req_manager = _ReadRowsRequestManager( | ||
| self.request, self.last_scanned_row_key, self._counter | ||
| ) | ||
| return req_manager.build_updated_request() | ||
|
|
||
| def _on_error(self, exc): | ||
| """Helper for :meth:`__iter__`.""" | ||
| # restart the read scan from AFTER the last successfully read row | ||
| retry_request = self.request | ||
| if self.last_scanned_row_key: | ||
| retry_request = self._create_retry_request() | ||
|
|
||
| self._row_merger = _RowMerger(self._row_merger.last_seen_row_key) | ||
| self.response_iterator = self.read_method(retry_request, retry=self.retry) | ||
|
|
||
| def _read_next(self): | ||
| """Helper for :meth:`__iter__`.""" | ||
| return next(self.response_iterator) | ||
|
|
||
| def _read_next_response(self): | ||
| """Helper for :meth:`__iter__`.""" | ||
| resp_protoplus = self.retry(self._read_next, on_error=self._on_error)() | ||
| # unwrap the underlying protobuf, there is a significant amount of | ||
| # overhead that protoplus imposes for very little gain. The protos | ||
| # are not user visible, so we just use the raw protos for merging. | ||
| return data_messages_v2_pb2.ReadRowsResponse.pb(resp_protoplus) | ||
|
|
||
| def __iter__(self): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if you try to iterate over this multiple times? I assume since the inner generator is exhausted, it would just yield nothing? Is that similar to the old implementation? Also, what do we expect to happen if we keep iterating after cancellation?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that in the original, if we keep iterating after cancellation, the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I previously suspected, iterating over the same I will add this to this branch to test that the same behavior occurs. |
||
| """Consume the ``ReadRowsResponse`` s from the stream. | ||
| Read the rows and yield each to the reader | ||
|
|
||
| Parse the response and its chunks into a new/existing row in | ||
| :attr:`_rows`. Rows are returned in order by row key. | ||
| """Consume the ``Row`` s from the stream. | ||
| Convert them to ``PartialRowData`` and yield each to the reader. | ||
| """ | ||
| while not self._cancelled: | ||
| try: | ||
| response = self._read_next_response() | ||
| except StopIteration: | ||
| self._row_merger.finalize() | ||
| break | ||
| except InvalidRetryRequest: | ||
| self._cancelled = True | ||
| break | ||
|
|
||
| for row in self._row_merger.process_chunks(response): | ||
| self.last_scanned_row_key = self._row_merger.last_seen_row_key | ||
| self._counter += 1 | ||
|
|
||
| yield row | ||
|
|
||
| if self._cancelled: | ||
| break | ||
| # The last response might not have generated any rows, but it | ||
| # could've updated last_scanned_row_key | ||
| self.last_scanned_row_key = self._row_merger.last_seen_row_key | ||
|
|
||
|
|
||
| class _ReadRowsRequestManager(object): | ||
| """Update the ReadRowsRequest message in case of failures by | ||
| filtering the already read keys. | ||
|
|
||
| :type message: class:`data_messages_v2_pb2.ReadRowsRequest` | ||
| :param message: Original ReadRowsRequest containing all of the parameters | ||
| of API call | ||
|
|
||
| :type last_scanned_key: bytes | ||
| :param last_scanned_key: last successfully scanned key | ||
|
|
||
| :type rows_read_so_far: int | ||
| :param rows_read_so_far: total no of rows successfully read so far. | ||
| this will be used for updating rows_limit | ||
|
|
||
| """ | ||
|
|
||
| def __init__(self, message, last_scanned_key, rows_read_so_far): | ||
| self.message = message | ||
| self.last_scanned_key = last_scanned_key | ||
| self.rows_read_so_far = rows_read_so_far | ||
|
|
||
| def build_updated_request(self): | ||
| """Updates the given message request as per last scanned key""" | ||
|
|
||
| resume_request = data_messages_v2_pb2.ReadRowsRequest() | ||
| data_messages_v2_pb2.ReadRowsRequest.copy_from(resume_request, self.message) | ||
|
|
||
| if self.message.rows_limit != 0: | ||
| row_limit_remaining = self.message.rows_limit - self.rows_read_so_far | ||
| if row_limit_remaining > 0: | ||
| resume_request.rows_limit = row_limit_remaining | ||
| else: | ||
| raise InvalidRetryRequest | ||
|
|
||
| # if neither RowSet.row_keys nor RowSet.row_ranges currently exist, | ||
| # add row_range that starts with last_scanned_key as start_key_open | ||
| # to request only rows that have not been returned yet | ||
| if "rows" not in self.message: | ||
| row_range = data_v2_pb2.RowRange(start_key_open=self.last_scanned_key) | ||
| resume_request.rows = data_v2_pb2.RowSet(row_ranges=[row_range]) | ||
| else: | ||
| row_keys = self._filter_rows_keys() | ||
| row_ranges = self._filter_row_ranges() | ||
|
|
||
| if len(row_keys) == 0 and len(row_ranges) == 0: | ||
| # Avoid sending empty row_keys and row_ranges | ||
| # if that was not the intention | ||
| raise InvalidRetryRequest | ||
|
|
||
| resume_request.rows = data_v2_pb2.RowSet( | ||
| row_keys=row_keys, row_ranges=row_ranges | ||
| ) | ||
| return resume_request | ||
|
|
||
| def _filter_rows_keys(self): | ||
| """Helper for :meth:`build_updated_request`""" | ||
| return [ | ||
| row_key | ||
| for row_key in self.message.rows.row_keys | ||
| if row_key > self.last_scanned_key | ||
| ] | ||
|
|
||
| def _filter_row_ranges(self): | ||
| """Helper for :meth:`build_updated_request`""" | ||
| new_row_ranges = [] | ||
|
|
||
| for row_range in self.message.rows.row_ranges: | ||
| # if current end_key (open or closed) is set, return its value, | ||
| # if not, set to empty string (''). | ||
| # NOTE: Empty string in end_key means "end of table" | ||
| end_key = self._end_key_set(row_range) | ||
| # if end_key is already read, skip to the next row_range | ||
| if end_key and self._key_already_read(end_key): | ||
| continue | ||
|
|
||
| # if current start_key (open or closed) is set, return its value, | ||
| # if not, then set to empty string ('') | ||
| # NOTE: Empty string in start_key means "beginning of table" | ||
| start_key = self._start_key_set(row_range) | ||
|
|
||
| # if start_key was already read or doesn't exist, | ||
| # create a row_range with last_scanned_key as start_key_open | ||
| # to be passed to retry request | ||
| retry_row_range = row_range | ||
| if self._key_already_read(start_key): | ||
| retry_row_range = copy.deepcopy(row_range) | ||
| retry_row_range.start_key_closed = _to_bytes("") | ||
| retry_row_range.start_key_open = self.last_scanned_key | ||
|
|
||
| new_row_ranges.append(retry_row_range) | ||
|
|
||
| return new_row_ranges | ||
|
|
||
| def _key_already_read(self, key): | ||
| """Helper for :meth:`_filter_row_ranges`""" | ||
| return key <= self.last_scanned_key | ||
|
|
||
| @staticmethod | ||
| def _start_key_set(row_range): | ||
| """Helper for :meth:`_filter_row_ranges`""" | ||
| return row_range.start_key_open or row_range.start_key_closed | ||
|
|
||
| @staticmethod | ||
| def _end_key_set(row_range): | ||
| """Helper for :meth:`_filter_row_ranges`""" | ||
| return row_range.end_key_open or row_range.end_key_closed | ||
| try: | ||
| for row in self._generator: | ||
| yield PartialRowData._from_data_client_row(row) | ||
|
|
||
| # Any exception from the generator should cancel the iterator. A | ||
| # timeout, defined by catching a DeadlineExceeded, should be reraised | ||
| # as a RetryError instead. | ||
| except exceptions.DeadlineExceeded as e: | ||
| self.cancel() | ||
| raise exceptions.RetryError(e.message, e.__cause__) | ||
| except Exception as e: | ||
| self.cancel() | ||
| raise e | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like max_loops isnt used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what to do here because it wasn't used in the original implementation of the function either, so I assume it was there to prevent a breaking change. Now that we are able to make a breaking change, I should remove it, right?