Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions google/cloud/storage/asyncio/retry/reads_resumption_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,24 @@ def update_state_from_response(
self, response: storage_v2.BidiReadObjectResponse, state: Dict[str, Any]
) -> None:
"""Processes a server response, performs integrity checks, and updates state."""

# Capture read_handle if provided.
if response.read_handle:
state["read_handle"] = response.read_handle
proto = getattr(response, "_pb", response)
if proto.read_handle:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency and robustness, it's better to use HasField() to check for the presence of message fields on raw protobuf objects, as you've done for read_range. While checking the truthiness of proto.read_handle might work, HasField("read_handle") is the explicitly supported method and avoids any ambiguity.

Suggested change
if proto.read_handle:
if proto.HasField("read_handle"):

state["read_handle"] = storage_v2.BidiReadHandle(
handle=proto.read_handle.handle
)

download_states = state["download_states"]

for object_data_range in response.object_data_ranges:
# Ignore empty ranges or ranges for IDs not in our state
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't delete these comments

# (e.g., from a previously cancelled request on the same stream).
if not object_data_range.read_range:
for object_data_range in proto.object_data_ranges:
if not object_data_range.HasField("read_range"):
logger.warning(
"Received response with missing read_range field; ignoring."
)
continue

read_id = object_data_range.read_range.read_id
read_range_pb = object_data_range.read_range
read_id = read_range_pb.read_id

if read_id not in download_states:
logger.warning(
f"Received data for unknown or stale read_id {read_id}; ignoring."
Expand All @@ -107,7 +108,7 @@ def update_state_from_response(
read_state = download_states[read_id]

# Offset Verification
chunk_offset = object_data_range.read_range.read_offset
chunk_offset = read_range_pb.read_offset
if chunk_offset != read_state.next_expected_offset:
raise DataCorruption(
response,
Expand All @@ -116,11 +117,11 @@ def update_state_from_response(
)

# Checksum Verification
# We must validate data before updating state or writing to buffer.
data = object_data_range.checksummed_data.content
server_checksum = object_data_range.checksummed_data.crc32c
checksummed_data = object_data_range.checksummed_data
data = checksummed_data.content

if server_checksum is not None:
if checksummed_data.HasField("crc32c"):
server_checksum = checksummed_data.crc32c
client_checksum = int.from_bytes(Checksum(data).digest(), "big")
if server_checksum != client_checksum:
raise DataCorruption(
Expand Down
17 changes: 9 additions & 8 deletions tests/perf/microbenchmarks/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import socket
import psutil

_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show
_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show


def publish_benchmark_extra_info(
benchmark: Any,
Expand All @@ -28,7 +29,6 @@ def publish_benchmark_extra_info(
download_bytes_list: Optional[List[int]] = None,
duration: Optional[int] = None,
) -> None:

"""
Helper function to publish benchmark parameters to the extra_info property.
"""
Expand All @@ -48,14 +48,15 @@ def publish_benchmark_extra_info(
benchmark.group = benchmark_group

if download_bytes_list is not None:
assert duration is not None, "Duration must be provided if total_bytes_transferred is provided."
assert (
duration is not None
), "Duration must be provided if total_bytes_transferred is provided."
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
min_throughput = min(throughputs_list)
max_throughput = max(throughputs_list)
mean_throughput = statistics.mean(throughputs_list)
median_throughput = statistics.median(throughputs_list)


else:
object_size = params.file_size_bytes
num_files = params.num_files
Expand Down Expand Up @@ -211,13 +212,13 @@ def get_affinity(irq):

def get_primary_interface_name():
primary_ip = None

# 1. Determine the Local IP used for internet access
# We use UDP (SOCK_DGRAM) so we don't actually send a handshake/packet
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# connect() to a public IP (Google DNS) to force route resolution
s.connect(('8.8.8.8', 80))
s.connect(("8.8.8.8", 80))
primary_ip = s.getsockname()[0]
except Exception:
# Fallback if no internet
Expand Down Expand Up @@ -248,7 +249,7 @@ def get_irq_affinity():
for irq in irqs:
affinity_str = get_affinity(irq)
if affinity_str != "N/A":
for part in affinity_str.split(','):
if '-' not in part:
for part in affinity_str.split(","):
if "-" not in part:
cpus.add(int(part))
return cpus
2 changes: 1 addition & 1 deletion tests/perf/microbenchmarks/time_based/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
@pytest.fixture
def workload_params(request):
params = request.param
files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)]
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)]
return params, files_names
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ async def _download_time_based_async(client, filename, params):


def _download_files_worker(process_idx, filename, params, bucket_type):

if bucket_type == "zonal":
return worker_loop.run_until_complete(
_download_time_based_async(worker_client, filename, params)
Expand Down
39 changes: 23 additions & 16 deletions tests/unit/asyncio/test_async_appendable_object_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ async def test_state_lookup(self, mock_appendable_writer):
writer._is_stream_open = True
writer.write_obj_stream = mock_appendable_writer["mock_stream"]

mock_appendable_writer["mock_stream"].recv.return_value = (
storage_type.BidiWriteObjectResponse(persisted_size=100)
)
mock_appendable_writer[
"mock_stream"
].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=100)

size = await writer.state_lookup()

Expand Down Expand Up @@ -246,9 +246,7 @@ async def test_append_data_less_than_flush_interval(self, mock_appendable_writer
],
)
@pytest.mark.asyncio
async def test_append(
self, data_len, mock_appendable_writer
):
async def test_append(self, data_len, mock_appendable_writer):
"""Verify append orchestrates manager and drives the internal generator."""
# Arrange
writer = self._make_one(mock_appendable_writer["mock_client"])
Expand All @@ -272,10 +270,19 @@ async def test_append(
# Assert
expected_recv_count = data_len // _DEFAULT_FLUSH_INTERVAL_BYTES
assert writer.offset == data_len
assert writer.bytes_appended_since_last_flush == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES
assert writer.persisted_size == expected_recv_count*_DEFAULT_FLUSH_INTERVAL_BYTES
assert writer.write_obj_stream.send.await_count == -(-data_len // _MAX_CHUNK_SIZE_BYTES) # Ceiling division for number of chunks
assert writer.write_obj_stream.recv.await_count == expected_recv_count # Expect 1 recv per flush interval
assert (
writer.bytes_appended_since_last_flush
== data_len % _DEFAULT_FLUSH_INTERVAL_BYTES
)
assert (
writer.persisted_size == expected_recv_count * _DEFAULT_FLUSH_INTERVAL_BYTES
)
assert writer.write_obj_stream.send.await_count == -(
-data_len // _MAX_CHUNK_SIZE_BYTES
) # Ceiling division for number of chunks
assert (
writer.write_obj_stream.recv.await_count == expected_recv_count
) # Expect 1 recv per flush interval

@pytest.mark.asyncio
async def test_append_recovery_reopens_stream(self, mock_appendable_writer):
Expand Down Expand Up @@ -339,9 +346,9 @@ async def test_flush_resets_counters(self, mock_appendable_writer):
writer.write_obj_stream = mock_appendable_writer["mock_stream"]
writer.bytes_appended_since_last_flush = 100

mock_appendable_writer["mock_stream"].recv.return_value = (
storage_type.BidiWriteObjectResponse(persisted_size=200)
)
mock_appendable_writer[
"mock_stream"
].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=200)

await writer.flush()

Expand Down Expand Up @@ -382,9 +389,9 @@ async def test_finalize_lifecycle(self, mock_appendable_writer):
writer.write_obj_stream = mock_appendable_writer["mock_stream"]

resource = storage_type.Object(size=999)
mock_appendable_writer["mock_stream"].recv.return_value = (
storage_type.BidiWriteObjectResponse(resource=resource)
)
mock_appendable_writer[
"mock_stream"
].recv.return_value = storage_type.BidiWriteObjectResponse(resource=resource)

res = await writer.finalize()

Expand Down