diff --git a/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py b/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py index 468954332..22e871519 100644 --- a/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py +++ b/google/cloud/storage/asyncio/retry/reads_resumption_strategy.py @@ -81,23 +81,24 @@ def update_state_from_response( self, response: storage_v2.BidiReadObjectResponse, state: Dict[str, Any] ) -> None: """Processes a server response, performs integrity checks, and updates state.""" - - # Capture read_handle if provided. - if response.read_handle: - state["read_handle"] = response.read_handle + proto = getattr(response, "_pb", response) + if proto.read_handle: + state["read_handle"] = storage_v2.BidiReadHandle( + handle=proto.read_handle.handle + ) download_states = state["download_states"] - for object_data_range in response.object_data_ranges: - # Ignore empty ranges or ranges for IDs not in our state - # (e.g., from a previously cancelled request on the same stream). - if not object_data_range.read_range: + for object_data_range in proto.object_data_ranges: + if not object_data_range.HasField("read_range"): logger.warning( "Received response with missing read_range field; ignoring." ) continue - read_id = object_data_range.read_range.read_id + read_range_pb = object_data_range.read_range + read_id = read_range_pb.read_id + if read_id not in download_states: logger.warning( f"Received data for unknown or stale read_id {read_id}; ignoring." @@ -107,7 +108,7 @@ def update_state_from_response( read_state = download_states[read_id] # Offset Verification - chunk_offset = object_data_range.read_range.read_offset + chunk_offset = read_range_pb.read_offset if chunk_offset != read_state.next_expected_offset: raise DataCorruption( response, @@ -116,11 +117,11 @@ def update_state_from_response( ) # Checksum Verification - # We must validate data before updating state or writing to buffer. - data = object_data_range.checksummed_data.content - server_checksum = object_data_range.checksummed_data.crc32c + checksummed_data = object_data_range.checksummed_data + data = checksummed_data.content - if server_checksum is not None: + if checksummed_data.HasField("crc32c"): + server_checksum = checksummed_data.crc32c client_checksum = int.from_bytes(Checksum(data).digest(), "big") if server_checksum != client_checksum: raise DataCorruption( diff --git a/tests/perf/microbenchmarks/_utils.py b/tests/perf/microbenchmarks/_utils.py index edf398fe9..9e5609500 100644 --- a/tests/perf/microbenchmarks/_utils.py +++ b/tests/perf/microbenchmarks/_utils.py @@ -18,7 +18,8 @@ import socket import psutil -_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show +_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show + def publish_benchmark_extra_info( benchmark: Any, @@ -28,7 +29,6 @@ def publish_benchmark_extra_info( download_bytes_list: Optional[List[int]] = None, duration: Optional[int] = None, ) -> None: - """ Helper function to publish benchmark parameters to the extra_info property. """ @@ -48,14 +48,15 @@ def publish_benchmark_extra_info( benchmark.group = benchmark_group if download_bytes_list is not None: - assert duration is not None, "Duration must be provided if total_bytes_transferred is provided." + assert ( + duration is not None + ), "Duration must be provided if total_bytes_transferred is provided." throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list] min_throughput = min(throughputs_list) max_throughput = max(throughputs_list) mean_throughput = statistics.mean(throughputs_list) median_throughput = statistics.median(throughputs_list) - else: object_size = params.file_size_bytes num_files = params.num_files @@ -211,13 +212,13 @@ def get_affinity(irq): def get_primary_interface_name(): primary_ip = None - + # 1. Determine the Local IP used for internet access # We use UDP (SOCK_DGRAM) so we don't actually send a handshake/packet s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # connect() to a public IP (Google DNS) to force route resolution - s.connect(('8.8.8.8', 80)) + s.connect(("8.8.8.8", 80)) primary_ip = s.getsockname()[0] except Exception: # Fallback if no internet @@ -248,7 +249,7 @@ def get_irq_affinity(): for irq in irqs: affinity_str = get_affinity(irq) if affinity_str != "N/A": - for part in affinity_str.split(','): - if '-' not in part: + for part in affinity_str.split(","): + if "-" not in part: cpus.add(int(part)) return cpus diff --git a/tests/perf/microbenchmarks/time_based/conftest.py b/tests/perf/microbenchmarks/time_based/conftest.py index bcd186d7b..5c0c787f0 100644 --- a/tests/perf/microbenchmarks/time_based/conftest.py +++ b/tests/perf/microbenchmarks/time_based/conftest.py @@ -17,5 +17,5 @@ @pytest.fixture def workload_params(request): params = request.param - files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)] + files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)] return params, files_names diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py index f2b84158b..17e6d48fd 100644 --- a/tests/perf/microbenchmarks/time_based/reads/test_reads.py +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -159,7 +159,6 @@ async def _download_time_based_async(client, filename, params): def _download_files_worker(process_idx, filename, params, bucket_type): - if bucket_type == "zonal": return worker_loop.run_until_complete( _download_time_based_async(worker_client, filename, params) diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 0c8fe4375..9eb701ed9 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -175,9 +175,9 @@ async def test_state_lookup(self, mock_appendable_writer): writer._is_stream_open = True writer.write_obj_stream = mock_appendable_writer["mock_stream"] - mock_appendable_writer["mock_stream"].recv.return_value = ( - storage_type.BidiWriteObjectResponse(persisted_size=100) - ) + mock_appendable_writer[ + "mock_stream" + ].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=100) size = await writer.state_lookup() @@ -246,9 +246,7 @@ async def test_append_data_less_than_flush_interval(self, mock_appendable_writer ], ) @pytest.mark.asyncio - async def test_append( - self, data_len, mock_appendable_writer - ): + async def test_append(self, data_len, mock_appendable_writer): """Verify append orchestrates manager and drives the internal generator.""" # Arrange writer = self._make_one(mock_appendable_writer["mock_client"]) @@ -272,10 +270,19 @@ async def test_append( # Assert expected_recv_count = data_len // _DEFAULT_FLUSH_INTERVAL_BYTES assert writer.offset == data_len - assert writer.bytes_appended_since_last_flush == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES - assert writer.persisted_size == expected_recv_count*_DEFAULT_FLUSH_INTERVAL_BYTES - assert writer.write_obj_stream.send.await_count == -(-data_len // _MAX_CHUNK_SIZE_BYTES) # Ceiling division for number of chunks - assert writer.write_obj_stream.recv.await_count == expected_recv_count # Expect 1 recv per flush interval + assert ( + writer.bytes_appended_since_last_flush + == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES + ) + assert ( + writer.persisted_size == expected_recv_count * _DEFAULT_FLUSH_INTERVAL_BYTES + ) + assert writer.write_obj_stream.send.await_count == -( + -data_len // _MAX_CHUNK_SIZE_BYTES + ) # Ceiling division for number of chunks + assert ( + writer.write_obj_stream.recv.await_count == expected_recv_count + ) # Expect 1 recv per flush interval @pytest.mark.asyncio async def test_append_recovery_reopens_stream(self, mock_appendable_writer): @@ -339,9 +346,9 @@ async def test_flush_resets_counters(self, mock_appendable_writer): writer.write_obj_stream = mock_appendable_writer["mock_stream"] writer.bytes_appended_since_last_flush = 100 - mock_appendable_writer["mock_stream"].recv.return_value = ( - storage_type.BidiWriteObjectResponse(persisted_size=200) - ) + mock_appendable_writer[ + "mock_stream" + ].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=200) await writer.flush() @@ -382,9 +389,9 @@ async def test_finalize_lifecycle(self, mock_appendable_writer): writer.write_obj_stream = mock_appendable_writer["mock_stream"] resource = storage_type.Object(size=999) - mock_appendable_writer["mock_stream"].recv.return_value = ( - storage_type.BidiWriteObjectResponse(resource=resource) - ) + mock_appendable_writer[ + "mock_stream" + ].recv.return_value = storage_type.BidiWriteObjectResponse(resource=resource) res = await writer.finalize()